Подтвердить что ты не робот

Повышение эффективности кода: стандартное отклонение на раздвижных окнах

Я пытаюсь улучшить функцию, которая вычисляет для каждого пикселя изображения стандартное отклонение пикселей, расположенных в окрестности пикселя. Моя функция использует две встроенные петли для работы по матрице, и это узкое место моей программы. Я думаю, что есть способ улучшить его, избавившись от циклов благодаря numpy, но я не знаю, как это сделать. Любые советы приветствуются!

рассматривает

def sliding_std_dev(image_original,radius=5) :
    height, width = image_original.shape
    result = np.zeros_like(image_original) # initialize the output matrix
    hgt = range(radius,height-radius)
    wdt = range(radius,width-radius)
    for i in hgt:
        for j in wdt:
            result[i,j] = np.std(image_original[i-radius:i+radius,j-radius:j+radius])
    return result
4b9b3361

Ответ 1

Прохладный трюк: вы можете вычислить стандартное отклонение, заданное только суммой квадратов и суммой значений в окне.

Поэтому вы можете очень быстро вычислить стандартное отклонение, используя единый фильтр данных:

from scipy.ndimage.filters import uniform_filter

def window_stdev(arr, radius):
    c1 = uniform_filter(arr, radius*2, mode='constant', origin=-radius)
    c2 = uniform_filter(arr*arr, radius*2, mode='constant', origin=-radius)
    return ((c2 - c1*c1)**.5)[:-radius*2+1,:-radius*2+1]

Это смехотворно быстрее, чем исходная функция. Для массива 1024x1024 и радиуса 20 старая функция занимает 34,11 секунды, а новая функция занимает 0,11 секунды, ускорение в 300 раз.


Как это работает математически? Он вычисляет величину sqrt(mean(x^2) - mean(x)^2) для каждого окна. Мы можем вывести эту величину из стандартного отклонения sqrt(mean((x - mean(x))^2)) следующим образом:

Пусть E - оператор ожидания (в основном mean()), а X - случайная величина данных. Тогда:

E[(X - E[X])^2]
= E[X^2 - 2X*E[X] + E[X]^2]
= E[X^2] - E[2X*E[X]] + E[E[X]^2] (по линейности оператора ожидания)
= E[X^2] - 2E[X]*E[X] + E[X]^2 (опять же по линейности, и тот факт, что E[X] является константой)
= E[X^2] - E[X]^2

что доказывает, что количество, вычисленное с использованием этого метода, математически эквивалентно стандартным отклонениям.

Ответ 2

Наиболее часто используемый метод для такого рода обработки изображений - использование таблиц с суммированными областями, идея, представленная в этой статье в 1984 году. Идея заключается в том, что, когда вы вычисляете количество, добавляя над окном, и перемещаете окно, например один пиксель вправо, вам не нужно добавлять все элементы в новое окно, вам нужно всего лишь вычесть крайний левый столбец из общего числа и добавить новый самый правый столбец. Поэтому, если вы создаете накопленный массив сумм по обоим измерениям из вашего массива, вы можете получить сумму по окну с несколькими суммами и вычитанием. Если вы сохраните таблицы суммированных областей для своего массива и его квадрата, очень легко получить дисперсию от этих двух. Вот реализация:

def windowed_sum(a, win):
    table = np.cumsum(np.cumsum(a, axis=0), axis=1)
    win_sum = np.empty(tuple(np.subtract(a.shape, win-1)))
    win_sum[0,0] = table[win-1, win-1]
    win_sum[0, 1:] = table[win-1, win:] - table[win-1, :-win]
    win_sum[1:, 0] = table[win:, win-1] - table[:-win, win-1]
    win_sum[1:, 1:] = (table[win:, win:] + table[:-win, :-win] -
                       table[win:, :-win] - table[:-win, win:])
    return win_sum

def windowed_var(a, win):
    win_a = windowed_sum(a, win)
    win_a2 = windowed_sum(a*a, win)
    return (win_a2 - win_a * win_a / win/ win) / win / win

Чтобы увидеть, что это работает:

>>> a = np.arange(25).reshape(5,5)
>>> windowed_var(a, 3)
array([[ 17.33333333,  17.33333333,  17.33333333],
       [ 17.33333333,  17.33333333,  17.33333333],
       [ 17.33333333,  17.33333333,  17.33333333]])
>>> np.var(a[:3, :3])
17.333333333333332
>>> np.var(a[-3:, -3:])
17.333333333333332

Это должно запускать несколько меток быстрее, чем методы, основанные на свертках.

Ответ 3

Прежде всего, существует более чем один способ сделать это.

Это не самая эффективная скорость, но с помощью scipy.ndimage.generic_filter позволит вам легко применить произвольную функцию python над движущимся окно.

В качестве быстрого примера:

result = scipy.ndimage.generic_filter(data, np.std, size=2*radius)

Заметим, что граничные условия можно контролировать с помощью mode kwarg.


Другой способ сделать это - использовать некоторые различные шагающие трюки, чтобы сделать представление о массиве, который эффективно перемещает окно, а затем применить np.std вдоль последней оси. (Примечание: это взято из одного из моих предыдущих ответов здесь: fooobar.com/questions/64232/...)

def strided_sliding_std_dev(data, radius=5):
    windowed = rolling_window(data, (2*radius, 2*radius))
    shape = windowed.shape
    windowed = windowed.reshape(shape[0], shape[1], -1)
    return windowed.std(axis=-1)

def rolling_window(a, window):
    """Takes a numpy array *a* and a sequence of (or single) *window* lengths
    and returns a view of *a* that represents a moving window."""
    if not hasattr(window, '__iter__'):
        return rolling_window_lastaxis(a, window)
    for i, win in enumerate(window):
        if win > 1:
            a = a.swapaxes(i, -1)
            a = rolling_window_lastaxis(a, win)
            a = a.swapaxes(-2, i)
    return a

def rolling_window_lastaxis(a, window):
    """Directly taken from Erik Rigtorp post to numpy-discussion.
    <http://www.mail-archive.com/[email protected]/msg29450.html>"""
    if window < 1:
       raise ValueError, "`window` must be at least 1."
    if window > a.shape[-1]:
       raise ValueError, "`window` is too long."
    shape = a.shape[:-1] + (a.shape[-1] - window + 1, window)
    strides = a.strides + (a.strides[-1],)
    return np.lib.stride_tricks.as_strided(a, shape=shape, strides=strides)

Немного трудно понять, что происходит здесь на первый взгляд. Не вставлять один из моих собственных ответов, но я не хочу повторно вводить объяснение, поэтому посмотрите здесь: fooobar.com/questions/64243/..., если у вас нет перед этим видят эти "шагающие" трюки.

Если мы сравниваем тайминги с массивом случайных чисел с размером 100 × 100 с radius из 5, это на 10 раз быстрее, чем исходная версия или версия generic_filter. Однако у вас нет гибкости в граничных условиях с этой версией. (Он идентичен тому, что вы сейчас делаете, а версия generic_filter дает вам большую гибкость за счет скорости.)

# Your original function with nested loops
In [21]: %timeit sliding_std_dev(data)
1 loops, best of 3: 237 ms per loop

# Using scipy.ndimage.generic_filter
In [22]: %timeit ndimage_std_dev(data)
1 loops, best of 3: 244 ms per loop

# The "stride-tricks" version above
In [23]: %timeit strided_sliding_std_dev(data)
100 loops, best of 3: 15.4 ms per loop

# Ophion version that uses `np.take`
In [24]: %timeit new_std_dev(data)
100 loops, best of 3: 19.3 ms per loop

Недостатком версии "шашки-трюки" является то, что в отличие от "обычных" трюков с прокруткой окна, эта версия делает копию, и она намного больше, чем исходный массив. У вас возникнут проблемы с памятью, если вы используете это на большом массиве! (На боковой ноте это в основном эквивалентно ответу @Ophion с точки зрения использования памяти и скорости. Это просто другой подход к тому, чтобы делать то же самое.)

Ответ 4

Сначала вы можете получить индексы, а затем использовать np.take для формирования нового массива:

def new_std_dev(image_original,radius=5):
    cols,rows=image_original.shape

    #First obtain the indices for the top left position
    diameter=np.arange(radius*2)
    x,y=np.meshgrid(diameter,diameter)
    index=np.ravel_multi_index((y,x),(cols,rows)).ravel()

    #Cast this in two dimesions and take the stdev
    index=index+np.arange(rows-radius*2)[:,None]+np.arange(cols-radius*2)[:,None,None]*(rows)
    data=np.std(np.take(image_original,index),-1)

    #Add the zeros back to the output array
    top=np.zeros((radius,rows-radius*2))
    sides=np.zeros((cols,radius))

    data=np.vstack((top,data,top))
    data=np.hstack((sides,data,sides))
    return data

Сначала создайте некоторые случайные данные и проверьте тайминги:

a=np.random.rand(50,20)

print np.allclose(new_std_dev(a),sliding_std_dev(a))
True

%timeit sliding_std_dev(a)
100 loops, best of 3: 18 ms per loop

%timeit new_std_dev(a)
1000 loops, best of 3: 472 us per loop

Для больших массивов его всегда быстрее, если у вас достаточно памяти:

a=np.random.rand(200,200)

print np.allclose(new_std_dev(a),sliding_std_dev(a))
True

%timeit sliding_std_dev(a)
1 loops, best of 3: 1.58 s per loop

%timeit new_std_dev(a)
10 loops, best of 3: 52.3 ms per loop

Исходная функция быстрее для очень маленьких массивов, похоже, что точка безубыточности - это когда hgt*wdt >50. Что-то примечание, что ваша функция принимает квадратные рамки и помещает std dev в нижний правый индекс, а не выборку вокруг индекса. Это намеренно?