Подтвердить что ты не робот

Свертка двух трехмерных массивов с дополнением с одной стороны слишком медленная

В моем текущем проекте мне нужно " свертить" два трехмерных массива несколько необычным способом:

Предположим, что мы имеем два трехмерных массива A и B с размерами dimA и dimB (одинаковые для каждой оси). Теперь мы хотим создать третий массив C с размерами dimA + dimB для каждой оси.

Записи C вычисляются как:

c_{x1+x2,y1+y2,z1+z2} += a_{x1,y1,z1} * b_{x2,y2,z2}

Моя текущая версия проста:

dimA = A.shape[0]
dimB = B.shape[0]
dimC = dimA+dimB

C = np.zeros((dimC,dimC,dimC))
for x1 in range(dimA):
    for x2 in range(dimB):
        for y1 in range(dimA):
            for y2 in range(dimB):
                for z1 in range(dimA):
                    for z2 in range(dimB):
                        x = x1+x2
                        y = y1+y2
                        z = z1+z2
                        C[x,y,z] += A[x1,y1,z1] * B[x2,y2,z2] 

К сожалению, эта версия очень медленная и непригодна для использования.

Моя вторая версия была:

C = scipy.signal.fftconvolve(A,B,mode="full")

Но это вычисляет только элементы max(dimA,dimB)

У кого-нибудь есть идея?

4b9b3361

Ответ 1

Вы пытались использовать Numba? Это пакет, который позволяет вам обернуть код Python, который обычно медленный с JIT-компилятором. Я быстро ударил по вашей проблеме с помощью Numba и значительно ускорился. Используя магическую функцию magicit timeit для IPython, функция custom_convolution заняла ~ 18 с, а оптимизированная функция Numba заняла 10,4 мс. Это ускорение более чем на 1700.

Здесь реализована функция Numba.

import numpy as np
from numba import jit, double

s = 15
array_a = np.random.rand(s ** 3).reshape(s, s, s)
array_b = np.random.rand(s ** 3).reshape(s, s, s)

# Original code
def custom_convolution(A, B):

    dimA = A.shape[0]
    dimB = B.shape[0]
    dimC = dimA + dimB

    C = np.zeros((dimC, dimC, dimC))
    for x1 in range(dimA):
        for x2 in range(dimB):
            for y1 in range(dimA):
                for y2 in range(dimB):
                    for z1 in range(dimA):
                        for z2 in range(dimB):
                            x = x1 + x2
                            y = y1 + y2
                            z = z1 + z2
                            C[x, y, z] += A[x1, y1, z1] * B[x2, y2, z2]
    return C

# Numba'ing the function with the JIT compiler
fast_convolution = jit(double[:, :, :](double[:, :, :],
                        double[:, :, :]))(custom_convolution)

Если вы вычисляете остатки между результатами обеих функций, вы получите нули. Это означает, что реализация JIT работает без проблем.

slow_result = custom_convolution(array_a, array_b) 
fast_result = fast_convolution(array_a, array_b)

print np.max(np.abs(slow_result - fast_result))

Выход, который я получаю для этого, 0.0.

Вы можете установить Numba в текущую настройку Python или быстро выполнить ее с помощью пакета AnacondaCE из continuum.io.

И последнее, но не менее важное: функция Numba быстрее, чем функция scipy.signal.fftconvolve, в несколько раз.

Примечание. Я использую Anaconda, а не AnacondaCE. Есть несколько отличий между двумя пакетами для производительности Numba, но я не думаю, что это будет сильно отличаться.

Ответ 2

Метод Numba выше - это аккуратный трюк, но это будет только преимуществом для относительно небольшого алгоритма N. Алгоритм O (N ^ 6) убьет вас каждый раз, независимо от того, насколько быстро он выполняется. В моих тестах метод fftconvolve пересекается быстрее, чем N = 20, а по N = 32 - в 10 раз быстрее. Оставив определение custom_convolution выше:

from timeit import Timer
import numpy as np
import matplotlib.pyplot as plt
from scipy.signal import fftconvolve
from numba import jit, double

# Numba'ing the function with the JIT compiler
numba_convolution = jit(double[:, :, :](double[:, :, :],
                        double[:, :, :]))(custom_convolution)

def fft_convolution(A, B):
    return fftconvolve(A, B, mode='full')

if __name__ == '__main__':
    reps = 3
    nt, ft = [], []
    x = range(2, 34)
    for N in x:
        print N
        A = np.random.rand(N, N, N)
        B = np.random.rand(N, N, N)
        C1 = numba_convolution(A, B)
        C2 = fft_convolution(A, B)
        assert np.allclose(C1[:-1, :-1, :-1], C2)
        t = Timer(lambda: numba_convolution(A, B))
        nt.append(t.timeit(number=reps))
        t = Timer(lambda: fft_convolution(A, B))
        ft.append(t.timeit(number=reps))
    plt.plot(x, ft, x, nt)
    plt.show()

Он также очень зависит от N, поскольку FFT будет намного быстрее для степеней 2. Время для версии FFT по существу постоянное для N = 17 до N = 32, и оно еще быстрее при N = 33, где он начинает быстро расходиться.

Вы можете попробовать обернуть реализацию FFT в Numba, но вы не можете сделать это напрямую со скудной версией.

(Извините, что создал новый ответ, но у меня нет точек для ответа напрямую.)

Ответ 3

Общее правило заключается в использовании правильного алгоритма для задания, которое, если ядро ​​свертки не является коротким по сравнению с данными, является сверткой на основе БПФ (кратковременно означает меньше log2 (n), где n - длина данные).

В вашем случае, поскольку вы свертываете два набора данных одинакового размера, вы, вероятно, захотите рассмотреть свертку на основе FFT.

Ясно, что scipy.signal.fftconvolve в этом отношении недостаточно. Используя более быстрый алгоритм БПФ, вы можете сделать гораздо лучше, скопировав свою собственную процедуру свертки (это не помогает тому, что fftconvolve заставляет размер преобразования равным двум, в противном случае это может быть обезглавлено обезьяной).

В следующем коде используется pyfftw, мои обертки вокруг FFTW и создает пользовательский класс свертки CustomFFTConvolution:

class CustomFFTConvolution(object):

    def __init__(self, A, B, threads=1):

        shape = (np.array(A.shape) + np.array(B.shape))-1

        if np.iscomplexobj(A) and np.iscomplexobj(B):
            self.fft_A_obj = pyfftw.builders.fftn(
                    A, s=shape, threads=threads)
            self.fft_B_obj = pyfftw.builders.fftn(
                    B, s=shape, threads=threads)
            self.ifft_obj = pyfftw.builders.ifftn(
                    self.fft_A_obj.get_output_array(), s=shape,
                    threads=threads)

        else:
            self.fft_A_obj = pyfftw.builders.rfftn(
                    A, s=shape, threads=threads)
            self.fft_B_obj = pyfftw.builders.rfftn(
                    B, s=shape, threads=threads)
            self.ifft_obj = pyfftw.builders.irfftn(
                    self.fft_A_obj.get_output_array(), s=shape,
                    threads=threads)

    def __call__(self, A, B):

        fft_padded_A = self.fft_A_obj(A)
        fft_padded_B = self.fft_B_obj(B)

        return self.ifft_obj(fft_padded_A * fft_padded_B)

Это используется как:

custom_fft_conv = CustomFFTConvolution(A, B)
C = custom_fft_conv(A, B) # This can contain different values to during construction

с необязательным аргументом threads при построении класса. Цель создания класса - извлечь выгоду из способности FFTW заранее планировать преобразование.

Полный демо-код ниже просто расширяет @Kelsey answer за время и т.д.

Ускорение существенно по сравнению с решением numba и решением ванили fftconvolve. При n = 33 он примерно на 40-45 раз быстрее, чем оба.

from timeit import Timer
import numpy as np
import matplotlib.pyplot as plt
from scipy.signal import fftconvolve
from numba import jit, double
import pyfftw

# Original code
def custom_convolution(A, B):

    dimA = A.shape[0]
    dimB = B.shape[0]
    dimC = dimA + dimB

    C = np.zeros((dimC, dimC, dimC))
    for x1 in range(dimA):
        for x2 in range(dimB):
            for y1 in range(dimA):
                for y2 in range(dimB):
                    for z1 in range(dimA):
                        for z2 in range(dimB):
                            x = x1 + x2
                            y = y1 + y2
                            z = z1 + z2
                            C[x, y, z] += A[x1, y1, z1] * B[x2, y2, z2]
    return C

# Numba'ing the function with the JIT compiler
numba_convolution = jit(double[:, :, :](double[:, :, :],
                        double[:, :, :]))(custom_convolution)

def fft_convolution(A, B):
    return fftconvolve(A, B, mode='full')

class CustomFFTConvolution(object):

    def __init__(self, A, B, threads=1):

        shape = (np.array(A.shape) + np.array(B.shape))-1

        if np.iscomplexobj(A) and np.iscomplexobj(B):
            self.fft_A_obj = pyfftw.builders.fftn(
                    A, s=shape, threads=threads)
            self.fft_B_obj = pyfftw.builders.fftn(
                    B, s=shape, threads=threads)
            self.ifft_obj = pyfftw.builders.ifftn(
                    self.fft_A_obj.get_output_array(), s=shape,
                    threads=threads)

        else:
            self.fft_A_obj = pyfftw.builders.rfftn(
                    A, s=shape, threads=threads)
            self.fft_B_obj = pyfftw.builders.rfftn(
                    B, s=shape, threads=threads)
            self.ifft_obj = pyfftw.builders.irfftn(
                    self.fft_A_obj.get_output_array(), s=shape,
                    threads=threads)

    def __call__(self, A, B):

        fft_padded_A = self.fft_A_obj(A)
        fft_padded_B = self.fft_B_obj(B)

        return self.ifft_obj(fft_padded_A * fft_padded_B)

def run_test():
    reps = 10
    nt, ft, cft, cft2 = [], [], [], []
    x = range(2, 34)

    for N in x:
        print N
        A = np.random.rand(N, N, N)
        B = np.random.rand(N, N, N)

        custom_fft_conv = CustomFFTConvolution(A, B)
        custom_fft_conv_nthreads = CustomFFTConvolution(A, B, threads=2)

        C1 = numba_convolution(A, B)
        C2 = fft_convolution(A, B)
        C3 = custom_fft_conv(A, B)
        C4 = custom_fft_conv_nthreads(A, B)

        assert np.allclose(C1[:-1, :-1, :-1], C2)
        assert np.allclose(C1[:-1, :-1, :-1], C3)
        assert np.allclose(C1[:-1, :-1, :-1], C4)

        t = Timer(lambda: numba_convolution(A, B))
        nt.append(t.timeit(number=reps))
        t = Timer(lambda: fft_convolution(A, B))
        ft.append(t.timeit(number=reps))
        t = Timer(lambda: custom_fft_conv(A, B))
        cft.append(t.timeit(number=reps))
        t = Timer(lambda: custom_fft_conv_nthreads(A, B))
        cft2.append(t.timeit(number=reps))

    plt.plot(x, ft, label='scipy.signal.fftconvolve')
    plt.plot(x, nt, label='custom numba convolve')
    plt.plot(x, cft, label='custom pyfftw convolve')
    plt.plot(x, cft2, label='custom pyfftw convolve with threading')        
    plt.legend()
    plt.show()

if __name__ == '__main__':
    run_test()

РЕДАКТИРОВАТЬ: Более свежий scipy делает лучшую работу не всегда для заполнения до 2 длины, поэтому ближе к выходу к случаю pyFFTW.