#python #оптимизация #python_2x
Есть два двумерных массива (например, ndarray): 3000х4000 (12 000 000 элементов) - 2,5D карта высот - назовем Массив1 100Х100 (10 000 элементов) - форма фрезы - назовем Массив2 Задача: нужно обойти "карту высот" "фрезой" и рассчитать 3й массив "оставшийся недорез" для каждого пикселя из Массива1. Проблема в том, что получается 3000Х4000Х100Х100=120 000 000 000 точек, и над каждой нужно провести одну операцию вычитания, одну операцию сравнения на меньше, и несколько операций записи. Python 2.7 делает это ооочень медленно(на моем компе рассчитывает около 14 часов) Подскажите, пожалуйста, как можно ускорить работу python скрипта? Используемый алгоритм простой: Цикл X по СтрокамМатрицы1: #3000 Цикл Y по СтрокамМатрицы1: #4000 Цикл N по СтрокамМатрицы2: #100 Цикл M по СтрокамМатрицы2: #100 ВремТочка = матр1[X+N,Y+M] - матр2[N,M] Если Матр3[X+N,Y+M] > ВремТочка: Матр3[X+N,Y+M] = ВремТочка В реальности есть еще несколько дополнительных операций, но на суть это не влияет. Вот используемый код: def get_rmf_map_tool(self,offset_image,tool_roughing,previous_offset,pixelstep_roughing): #Dictionary faster then array map_tool1 = {} #numpy.zeros((self.w, self.h), numpy.float32) - self.image.min() y = x = 0 #init y,x for speed ts_roughing = tool_roughing.shape[0] max_line,max_pix = self.get_im_maxmin(self.w, self.h, ts_roughing) jrange = self.mxrange(self.row_mill, max_line, pixelstep_roughing) irange = self.mxrange(self.row_mill, max_pix) ln = max_line if prn_detal > 0: print "(Previous tool shape: {0} pixels, max line: {1}, max pixel: {2} )".format(ts_roughing,max_line,max_pix) trange = xrange(0,ts_roughing) for lin in jrange: #lines progress(lin, ln) for pix in irange: #pixels if self.row_mill: y,x = lin,pix else: x,y = lin,pix m1 = offset_image[y:y+ts_roughing, x:x+ts_roughing] hhh1 = (m1 - tool_roughing).max() + previous_offset for i in trange: #lines tool for j in trange: #pixels tool t = tool_roughing[i,j] if isinf(t): continue ty = i+y tx = j+x # self.image[ty,tx] <= hhh1 !!! t >= 0 dt = -self.image[ty,tx] + hhh1 + t #dt = round(dt,16) if dt < -epsilon and prn_detal > -1: print(" delta < -0.00001 ",ty,tx,dt,self.image[ty,tx], hhh1, t) if dt < .0: dt = .0 try: if map_tool1[ty,tx] > dt: #finde MAX for this pixel map_tool1[ty,tx] = dt except KeyError: map_tool1[ty,tx] = dt if prn_detal > 0: print "(End make map tool1. Map len: {0} pixels. End at {1})".format(len(map_tool1),datetime.now()) if len(map_tool1) == 0 and prn_detal > -1: print "(WARNING! Map tool1 len: {0}! )".format(len(map_tool1)) Принимаются любые предложения: ассемблер, ctypes, cython, средствами графического процессора, OpenGL, Numpy, CUDA, ... Желательно с примером.
Ответы
Ответ 1
Увы, если ничего принципиально не менять в алгоритме, то разве что средствами графического процессора. На таких данных, пишите вы для процессора хоть на ассемблере, быстро не получится. Вот пример, чтоб не быть голословным: int main() { int i, j, k, l; long z = 0; for (i = 0; i < 300; ++i) for (j = 0; j < 4000; ++j) for (k = 0; k < 100; ++k) for (l = 0; l < 100; ++l) z++; } Как вы видите, я первый параметр уменьшил в 10 раз (ну допустим вы распараллелите на гипотетические 10 ядер), и вот, что получилось на не самом слабом компьютере: $ gcc test.c -o test $ time ./test ./test 31,72s user 0,00s system 99% cpu 31,728 total То есть почти 32 секунды, и это на чистом C с минимумом операций. Ну, будет, допустим, 20 секунд на самом современном процессоре, вряд ли это можно назвать быстро. Обновление: написал вашу логику на Cython: import numpy as np cimport numpy as np def calc( np.ndarray[np.double_t, ndim=2] A, np.ndarray[np.double_t, ndim=2] B, np.ndarray[np.double_t, ndim=2] C): cdef int N1 = A.shape[0] cdef int M1 = A.shape[1] cdef int N2 = B.shape[0] cdef int M2 = B.shape[1] cdef int i, j, k, l cdef double tp if N1 != C.shape[0] or M1 != C.shape[1]: raise ValueError('Array dimensions mismatch') for k in range(N2): for l in range(M2): for i in range(N1 - N2): for j in range(M1 - M2): tp = A[i + k, j + l] - B[k, l] if C[i + k, j + l] > tp: C[i + k, j + l] = tp Тестовый код (опять в 10 раз меньше данных, чтоб не ждать долго): import numpy as np import calc N, M = 300, 4000 n, m = 100, 100 A = np.random.random((N, M)) B = np.random.random((n, m)) C = np.random.random((N, M)) calc.calc(A, B, C) Вот пример запуска: $ time python2 test.py python2 test.py 36,68s user 0,05s system 99% cpu 36,734 total То есть ненамного дольше, чем на чистом C. На полных данных точно укладывается в ваше время. Обновление 2: ради интереса запустил на полных данных (массив размера 3000x4000), и получил следующий результат: $ time python2 test.py python2 test.py 1572,55s user 2,11s system 99% cpu 26:16,10 total В 30 минут уложились, но, конечно, получилось не так быстро, как хотелось бы. Предполагаю, это из-за того, что данные перестали помещаться в кэш. Скорее всего, если разбить большой массив на 10 меньших (которые будут попадать в кэш) и последовательно обработать их, получится гораздо быстрее. Можете попробовать сами, в обработке таких массивов будет пара тонких моментов, так что в рамках ответа на вопрос мне такой код писать лень.
Комментариев нет:
Отправить комментарий