Есть два двумерных массива (например, ndarray):
3000х4000 (12 000 000 элементов) - 2,5D карта высот - назовем Массив1
100Х100 (10 000 элементов) - форма фрезы - назовем Массив2
Задача: нужно обойти "карту высот" "фрезой" и рассчитать 3й массив "оставшийся недорез" для каждого пикселя из Массива1.
Проблема в том, что получается 3000Х4000Х100Х100=120 000 000 000 точек, и над каждой нужно провести одну операцию вычитания, одну операцию сравнения на меньше, и несколько операций записи.
Python 2.7 делает это ооочень медленно(на моем компе рассчитывает около 14 часов)
Подскажите, пожалуйста, как можно ускорить работу python скрипта?
Используемый алгоритм простой:
Цикл X по СтрокамМатрицы1: #3000
Цикл Y по СтрокамМатрицы1: #4000
Цикл N по СтрокамМатрицы2: #100
Цикл M по СтрокамМатрицы2: #100
ВремТочка = матр1[X+N,Y+M] - матр2[N,M]
Если Матр3[X+N,Y+M] > ВремТочка:
Матр3[X+N,Y+M] = ВремТочка
В реальности есть еще несколько дополнительных операций, но на суть это не влияет.
Вот используемый код:
def get_rmf_map_tool(self,offset_image,tool_roughing,previous_offset,pixelstep_roughing):
#Dictionary faster then array
map_tool1 = {} #numpy.zeros((self.w, self.h), numpy.float32) - self.image.min()
y = x = 0 #init y,x for speed
ts_roughing = tool_roughing.shape[0]
max_line,max_pix = self.get_im_maxmin(self.w, self.h, ts_roughing)
jrange = self.mxrange(self.row_mill, max_line, pixelstep_roughing)
irange = self.mxrange(self.row_mill, max_pix)
ln = max_line
if prn_detal > 0: print "(Previous tool shape: {0} pixels, max line: {1}, max pixel: {2} )".format(ts_roughing,max_line,max_pix)
trange = xrange(0,ts_roughing)
for lin in jrange: #lines
progress(lin, ln)
for pix in irange: #pixels
if self.row_mill:
y,x = lin,pix
else:
x,y = lin,pix
m1 = offset_image[y:y+ts_roughing, x:x+ts_roughing]
hhh1 = (m1 - tool_roughing).max() + previous_offset
for i in trange: #lines tool
for j in trange: #pixels tool
t = tool_roughing[i,j]
if isinf(t): continue
ty = i+y
tx = j+x
# self.image[ty,tx] <= hhh1 !!! t >= 0
dt = -self.image[ty,tx] + hhh1 + t
#dt = round(dt,16)
if dt < -epsilon and prn_detal > -1: print(" delta < -0.00001 ",ty,tx,dt,self.image[ty,tx], hhh1, t)
if dt < .0: dt = .0
try:
if map_tool1[ty,tx] > dt: #finde MAX for this pixel
map_tool1[ty,tx] = dt
except KeyError:
map_tool1[ty,tx] = dt
if prn_detal > 0: print "(End make map tool1. Map len: {0} pixels. End at {1})".format(len(map_tool1),datetime.now())
if len(map_tool1) == 0 and prn_detal > -1: print "(WARNING! Map tool1 len: {0}! )".format(len(map_tool1))
Принимаются любые предложения: ассемблер, ctypes, cython, средствами графического процессора, OpenGL, Numpy, CUDA, ... Желательно с примером.
Ответ
Увы, если ничего принципиально не менять в алгоритме, то разве что средствами графического процессора. На таких данных, пишите вы для процессора хоть на ассемблере, быстро не получится. Вот пример, чтоб не быть голословным:
int main()
{
int i, j, k, l;
long z = 0;
for (i = 0; i < 300; ++i)
for (j = 0; j < 4000; ++j)
for (k = 0; k < 100; ++k)
for (l = 0; l < 100; ++l)
z++;
}
Как вы видите, я первый параметр уменьшил в 10 раз (ну допустим вы распараллелите на гипотетические 10 ядер), и вот, что получилось на не самом слабом компьютере:
$ gcc test.c -o test
$ time ./test
./test 31,72s user 0,00s system 99% cpu 31,728 total
То есть почти 32 секунды, и это на чистом C с минимумом операций. Ну, будет, допустим, 20 секунд на самом современном процессоре, вряд ли это можно назвать быстро.
Обновление: написал вашу логику на Cython:
import numpy as np
cimport numpy as np
def calc(
np.ndarray[np.double_t, ndim=2] A,
np.ndarray[np.double_t, ndim=2] B,
np.ndarray[np.double_t, ndim=2] C):
cdef int N1 = A.shape[0]
cdef int M1 = A.shape[1]
cdef int N2 = B.shape[0]
cdef int M2 = B.shape[1]
cdef int i, j, k, l
cdef double tp
if N1 != C.shape[0] or M1 != C.shape[1]:
raise ValueError('Array dimensions mismatch')
for k in range(N2):
for l in range(M2):
for i in range(N1 - N2):
for j in range(M1 - M2):
tp = A[i + k, j + l] - B[k, l]
if C[i + k, j + l] > tp:
C[i + k, j + l] = tp
Тестовый код (опять в 10 раз меньше данных, чтоб не ждать долго):
import numpy as np
import calc
N, M = 300, 4000
n, m = 100, 100
A = np.random.random((N, M))
B = np.random.random((n, m))
C = np.random.random((N, M))
calc.calc(A, B, C)
Вот пример запуска:
$ time python2 test.py
python2 test.py 36,68s user 0,05s system 99% cpu 36,734 total
То есть ненамного дольше, чем на чистом C. На полных данных точно укладывается в ваше время.
Обновление 2: ради интереса запустил на полных данных (массив размера 3000x4000), и получил следующий результат:
$ time python2 test.py
python2 test.py 1572,55s user 2,11s system 99% cpu 26:16,10 total
В 30 минут уложились, но, конечно, получилось не так быстро, как хотелось бы. Предполагаю, это из-за того, что данные перестали помещаться в кэш. Скорее всего, если разбить большой массив на 10 меньших (которые будут попадать в кэш) и последовательно обработать их, получится гораздо быстрее. Можете попробовать сами, в обработке таких массивов будет пара тонких моментов, так что в рамках ответа на вопрос мне такой код писать лень.
Комментариев нет:
Отправить комментарий