Страницы

Поиск по вопросам

среда, 3 апреля 2019 г.

Python2.7 ускорение работы кода, сложение огромных 2хмерных массивов

Есть два двумерных массива (например, ndarray):
3000х4000 (12 000 000 элементов) - 2,5D карта высот - назовем Массив1 100Х100 (10 000 элементов) - форма фрезы - назовем Массив2
Задача: нужно обойти "карту высот" "фрезой" и рассчитать 3й массив "оставшийся недорез" для каждого пикселя из Массива1.
Проблема в том, что получается 3000Х4000Х100Х100=120 000 000 000 точек, и над каждой нужно провести одну операцию вычитания, одну операцию сравнения на меньше, и несколько операций записи.
Python 2.7 делает это ооочень медленно(на моем компе рассчитывает около 14 часов)
Подскажите, пожалуйста, как можно ускорить работу python скрипта?
Используемый алгоритм простой:
Цикл X по СтрокамМатрицы1: #3000 Цикл Y по СтрокамМатрицы1: #4000 Цикл N по СтрокамМатрицы2: #100 Цикл M по СтрокамМатрицы2: #100 ВремТочка = матр1[X+N,Y+M] - матр2[N,M] Если Матр3[X+N,Y+M] > ВремТочка: Матр3[X+N,Y+M] = ВремТочка
В реальности есть еще несколько дополнительных операций, но на суть это не влияет.
Вот используемый код:
def get_rmf_map_tool(self,offset_image,tool_roughing,previous_offset,pixelstep_roughing):
#Dictionary faster then array map_tool1 = {} #numpy.zeros((self.w, self.h), numpy.float32) - self.image.min() y = x = 0 #init y,x for speed
ts_roughing = tool_roughing.shape[0]
max_line,max_pix = self.get_im_maxmin(self.w, self.h, ts_roughing)
jrange = self.mxrange(self.row_mill, max_line, pixelstep_roughing) irange = self.mxrange(self.row_mill, max_pix) ln = max_line
if prn_detal > 0: print "(Previous tool shape: {0} pixels, max line: {1}, max pixel: {2} )".format(ts_roughing,max_line,max_pix)
trange = xrange(0,ts_roughing)
for lin in jrange: #lines progress(lin, ln) for pix in irange: #pixels
if self.row_mill: y,x = lin,pix else: x,y = lin,pix
m1 = offset_image[y:y+ts_roughing, x:x+ts_roughing] hhh1 = (m1 - tool_roughing).max() + previous_offset
for i in trange: #lines tool for j in trange: #pixels tool
t = tool_roughing[i,j] if isinf(t): continue
ty = i+y tx = j+x # self.image[ty,tx] <= hhh1 !!! t >= 0 dt = -self.image[ty,tx] + hhh1 + t #dt = round(dt,16)
if dt < -epsilon and prn_detal > -1: print(" delta < -0.00001 ",ty,tx,dt,self.image[ty,tx], hhh1, t) if dt < .0: dt = .0
try: if map_tool1[ty,tx] > dt: #finde MAX for this pixel map_tool1[ty,tx] = dt except KeyError: map_tool1[ty,tx] = dt
if prn_detal > 0: print "(End make map tool1. Map len: {0} pixels. End at {1})".format(len(map_tool1),datetime.now()) if len(map_tool1) == 0 and prn_detal > -1: print "(WARNING! Map tool1 len: {0}! )".format(len(map_tool1))
Принимаются любые предложения: ассемблер, ctypes, cython, средствами графического процессора, OpenGL, Numpy, CUDA, ... Желательно с примером.


Ответ

Увы, если ничего принципиально не менять в алгоритме, то разве что средствами графического процессора. На таких данных, пишите вы для процессора хоть на ассемблере, быстро не получится. Вот пример, чтоб не быть голословным:
int main() { int i, j, k, l; long z = 0;
for (i = 0; i < 300; ++i) for (j = 0; j < 4000; ++j) for (k = 0; k < 100; ++k) for (l = 0; l < 100; ++l) z++; }
Как вы видите, я первый параметр уменьшил в 10 раз (ну допустим вы распараллелите на гипотетические 10 ядер), и вот, что получилось на не самом слабом компьютере:
$ gcc test.c -o test $ time ./test ./test 31,72s user 0,00s system 99% cpu 31,728 total
То есть почти 32 секунды, и это на чистом C с минимумом операций. Ну, будет, допустим, 20 секунд на самом современном процессоре, вряд ли это можно назвать быстро.
Обновление: написал вашу логику на Cython:
import numpy as np cimport numpy as np
def calc( np.ndarray[np.double_t, ndim=2] A, np.ndarray[np.double_t, ndim=2] B, np.ndarray[np.double_t, ndim=2] C):
cdef int N1 = A.shape[0] cdef int M1 = A.shape[1] cdef int N2 = B.shape[0] cdef int M2 = B.shape[1] cdef int i, j, k, l cdef double tp
if N1 != C.shape[0] or M1 != C.shape[1]: raise ValueError('Array dimensions mismatch')
for k in range(N2): for l in range(M2): for i in range(N1 - N2): for j in range(M1 - M2): tp = A[i + k, j + l] - B[k, l] if C[i + k, j + l] > tp: C[i + k, j + l] = tp
Тестовый код (опять в 10 раз меньше данных, чтоб не ждать долго):
import numpy as np import calc
N, M = 300, 4000 n, m = 100, 100
A = np.random.random((N, M)) B = np.random.random((n, m)) C = np.random.random((N, M))
calc.calc(A, B, C)
Вот пример запуска:
$ time python2 test.py python2 test.py 36,68s user 0,05s system 99% cpu 36,734 total
То есть ненамного дольше, чем на чистом C. На полных данных точно укладывается в ваше время.
Обновление 2: ради интереса запустил на полных данных (массив размера 3000x4000), и получил следующий результат:
$ time python2 test.py python2 test.py 1572,55s user 2,11s system 99% cpu 26:16,10 total
В 30 минут уложились, но, конечно, получилось не так быстро, как хотелось бы. Предполагаю, это из-за того, что данные перестали помещаться в кэш. Скорее всего, если разбить большой массив на 10 меньших (которые будут попадать в кэш) и последовательно обработать их, получится гораздо быстрее. Можете попробовать сами, в обработке таких массивов будет пара тонких моментов, так что в рамках ответа на вопрос мне такой код писать лень.

Комментариев нет:

Отправить комментарий