Я хочу сравнить разные области 2-мерного массива $A $с заданным массивом $b $меньшего размера. Поскольку я должен делать это много раз, очень важно, чтобы это выполнялось очень быстро. У меня есть решение, которое отлично работает, но я надеялся, что это можно сделать лучше и быстрее.
Подробнее:
Скажем, у нас есть большой массив и небольшой массив. Я перебираю все возможные "патчи" в большом массиве, которые имеют тот же размер, что и малый массив, и сравнивают эти патчи с данным небольшим массивом.
def get_best_fit(big_array, small_array):
# we assume the small array is square
patch_size = small_array.shape[0]
min_value = np.inf
for x in range(patch_size, big_array.shape[0] - patch_size):
for y in range(patch_size, big_array.shape[1] - patch_size):
p = get_patch_term(x, y, patch_size, big_array)
tmp = some_metric(p, small_array)
if min_value > tmp:
min_value = tmp
min_patch = p
return min_patch, min_value
Чтобы получить исправления, я получил реализацию прямого доступа к массиву:
def get_patch_term(x, y, patch_size, data):
"""
a patch has the size (patch_size)^^2
"""
patch = data[(x - (patch_size-1)/2): (x + (patch_size-1)/2 + 1),
(y - (patch_size-1)/2): (y + (patch_size-1)/2 + 1)]
return patch
Я думаю, что это самая важная задача, и ее можно выполнять быстрее, но я не уверен в этом.
Я посмотрел на Китона, но, возможно, я сделал это неправильно, я не очень хорошо знаком с ним.
Моя первая попытка - прямой перевод на cython:
def get_patch_term_fast(Py_ssize_t x_i, Py_ssize_t y_i,
Py_ssize_t patch_size,
np.ndarray[DTYPE_t, ndim=2] big_array):
assert big_array.dtype == DTYPE
patch_size = (patch_size - 1)/2
cdef np.ndarray[DTYPE_t, ndim=2] patch = <np.ndarray[DTYPE_t, ndim=2]>big_array[(x_i - patch_size):(x_i + patch_size + 1), (y_i - patch_size): (y_i + patch_size + 1)]
return patch
И это, кажется, быстрее (см. ниже), но я думал, что параллельный подход должен быть лучше, поэтому я придумал этот
def get_patch_term_fast_parallel(Py_ssize_t x_i, Py_ssize_t y_i,
Py_ssize_t patch_size,
np.ndarray[DTYPE_t, ndim=2] big_array):
assert big_array.dtype == DTYPE
patch_size = (patch_size - 1)/2
assert big_array.dtype == DTYPE
cdef Py_ssize_t x
cdef Py_ssize_t y
cdef np.ndarray[DTYPE_t, ndim=1] patch = np.empty(np.power((2 * patch_size) + 1, 2))
with nogil, parallel():
for x in prange(x_i - patch_size, x_i + patch_size + 1):
for y in prange(y_i - patch_size, y_i + patch_size + 1):
patch[((x - (x_i - patch_size)) * (2 * patch_size + 1)) + (y - (y_i - patch_size))] = big_array[x, y]
#cdef np.ndarray[DTYPE_t, ndim=2] patch = <np.ndarray[DTYPE_t, ndim=2]>big_array[(x_i - patch_size):(x_i + patch_size + 1), (y_i - patch_size): (y_i + patch_size + 1)]
return patch
Это, к сожалению, не быстрее. Для тестирования я использовал:
A = np.array(range(1200), dtype=np.float).reshape(30, 40)
b = np.array([41, 42, 81, 84]).reshape(2, 2)
x = 7
y = 7
print(timeit.timeit(lambda:get_patch_term_fast(x, y, 3, A), number=300))
print(timeit.timeit(lambda:get_patch_term_fast_parallel(x, y, 3, A).reshape(3,3), number=300))
print(timeit.timeit(lambda:get_patch_term(x, y, 3, A), number=300))
Что дает
0.0008792859734967351
0.0029909340664744377
0.0029337930027395487
Итак, мой первый вопрос: возможно ли это сделать быстрее? Второй вопрос заключается в том, почему параллельный подход не быстрее первоначальной реализации numpy?
Edit:
Я попытался продолжить распараллеливание функции get_best_fit, но, к сожалению, я получаю много ошибок, заявляя, что я не могу назначить объект Python без gil.
Вот код:
def get_best_fit_fast(np.ndarray[DTYPE_t, ndim=2] big_array,
np.ndarray[DTYPE_t, ndim=2] small_array):
# we assume the small array is square
cdef Py_ssize_t patch_size = small_array.shape[0]
cdef Py_ssize_t x
cdef Py_ssize_t y
cdef Py_ssize_t x_range = big_array.shape[0] - patch_size
cdef Py_ssize_t y_range = big_array.shape[1] - patch_size
cdef np.ndarray[DTYPE_t, ndim=2] p
cdef np.ndarray[DTYPE_t, ndim=2] weights = np.empty((x_range - patch_size)*(y_range - patch_size)).reshape((x_range - patch_size), (y_range - patch_size))
with nogil, parallel():
for x in prange(patch_size, x_range):
for y in prange(patch_size, y_range):
p = get_patch_term_fast(x, y, patch_size, big_array)
weights[x - patch_size, y - patch_size] = np.linalg.norm(np.abs(p - small_array))
return np.min(weights)
PS: Я пропустил часть возвращения самого маленького патча...