Подтвердить что ты не робот

Среднее скользящее среднее

Есть ли функция scipy или numpy или модуль для python, который вычисляет среднее значение для одномерного массива при заданном окне?

4b9b3361

Ответ 1

Для короткого быстрого решения, которое делает все в одном цикле, без зависимостей, код ниже отлично работает.

mylist = [1, 2, 3, 4, 5, 6, 7]
N = 3
cumsum, moving_aves = [0], []

for i, x in enumerate(mylist, 1):
    cumsum.append(cumsum[i-1] + x)
    if i>=N:
        moving_ave = (cumsum[i] - cumsum[i-N])/N
        #can do stuff with moving_ave here
        moving_aves.append(moving_ave)

Ответ 2

UPD: более эффективные решения были предложены Alleo и jasaarim.


Вы можете использовать np.convolve для этого:

np.convolve(x, np.ones((N,))/N, mode='valid')

Объяснение

Среднее значение пробега - это случай математической операции convolution. Для среднего значения вы сдвигаете окно по входу и вычисляете среднее значение содержимого окна. Для дискретных 1D-сигналов свертка - это одно и то же, за исключением того, что вместо среднего вы вычисляете произвольную линейную комбинацию, т.е. Умножаете каждый элемент на соответствующий коэффициент и складываем результаты. Эти коэффициенты, по одному для каждой позиции в окне, иногда называют ядром свертки. Теперь среднее арифметическое значений N равно (x_1 + x_2 + ... + x_N) / N, поэтому соответствующее ядро ​​(1/N, 1/N, ..., 1/N), и именно это мы получаем с помощью np.ones((N,))/N.

Ребра

Аргумент mode np.convolve указывает, как обрабатывать ребра. Я выбрал режим valid здесь, потому что я думаю, что большинство людей ожидают, что текущее среднее будет работать, но у вас могут быть другие приоритеты. Вот график, который иллюстрирует разницу между режимами:

import numpy as np
import matplotlib.pyplot as plt
modes = ['full', 'same', 'valid']
for m in modes:
    plt.plot(np.convolve(np.ones((200,)), np.ones((50,))/50, mode=m));
plt.axis([-10, 251, -.1, 1.1]);
plt.legend(modes, loc='lower center');
plt.show()

Running mean convolve modes

Ответ 3

Эффективное решение

Свертка намного лучше, чем простой подход, но (я думаю) он использует БПФ и, следовательно, довольно медленный. Однако, специально для вычисления среднего значения работает следующий подход

def running_mean(x, N):
    cumsum = numpy.cumsum(numpy.insert(x, 0, 0)) 
    return (cumsum[N:] - cumsum[:-N]) / float(N)

Код для проверки

In[3]: x = numpy.random.random(100000)
In[4]: N = 1000
In[5]: %timeit result1 = numpy.convolve(x, numpy.ones((N,))/N, mode='valid')
10 loops, best of 3: 41.4 ms per loop
In[6]: %timeit result2 = running_mean(x, N)
1000 loops, best of 3: 1.04 ms per loop

Обратите внимание, что numpy.allclose(result1, result2) - True, два метода эквивалентны. Чем больше N, тем больше разница во времени.

Ответ 4

Обновление: В приведенном ниже примере показана старая функция pandas.rolling_mean, которая была удалена в последних версиях панд. Современный эквивалент вызова функции ниже будет

In [8]: pd.Series(x).rolling(window=N).mean().iloc[N-1:].values
Out[8]: 
array([ 0.49815397,  0.49844183,  0.49840518, ...,  0.49488191,
        0.49456679,  0.49427121])

Панды больше подходят для этого, чем NumPy или SciPy. Его функция Roll_Man делает работу удобно. Он также возвращает массив NumPy, когда входные данные являются массивом.

Трудно превзойти rolling_mean по производительности при любой пользовательской реализации Python. Вот пример производительности по сравнению с двумя из предложенных решений:

In [1]: import numpy as np

In [2]: import pandas as pd

In [3]: def running_mean(x, N):
   ...:     cumsum = np.cumsum(np.insert(x, 0, 0)) 
   ...:     return (cumsum[N:] - cumsum[:-N]) / N
   ...:

In [4]: x = np.random.random(100000)

In [5]: N = 1000

In [6]: %timeit np.convolve(x, np.ones((N,))/N, mode='valid')
10 loops, best of 3: 172 ms per loop

In [7]: %timeit running_mean(x, N)
100 loops, best of 3: 6.72 ms per loop

In [8]: %timeit pd.rolling_mean(x, N)[N-1:]
100 loops, best of 3: 4.74 ms per loop

In [9]: np.allclose(pd.rolling_mean(x, N)[N-1:], running_mean(x, N))
Out[9]: True

Есть также хорошие варианты того, как обращаться со значениями ребер.

Ответ 5

Вы можете вычислить текущее среднее значение с помощью:

import numpy as np

def runningMean(x, N):
    y = np.zeros((len(x),))
    for ctr in range(len(x)):
         y[ctr] = np.sum(x[ctr:(ctr+N)])
    return y/N

Но он медленный.

К счастью, numpy включает функцию convolve, которую мы можем использовать для ускорения работы. Среднее значение равносильно свертыванию x с вектором N long, при этом все члены равны 1/N. Многократная реализация convolve включает начальный переходный процесс, поэтому вам нужно удалить первые N-1 точки:

def runningMeanFast(x, N):
    return np.convolve(x, np.ones((N,))/N)[(N-1):]

На моей машине быстрая версия в 20-30 раз быстрее, в зависимости от длины входного вектора и размера окна усреднения.

Обратите внимание, что convolve включает в себя режим 'same', который, похоже, должен учитывать проблему начального переходного процесса, но он разделяет его между началом и концом.

Ответ 6

или модуль для python, который вычисляет

в моих тестах на Tradewave.net TA-lib всегда выигрывает:

import talib as ta
import numpy as np
import pandas as pd
import scipy
from scipy import signal
import time as t

PAIR = info.primary_pair
PERIOD = 30

def initialize():
    storage.reset()
    storage.elapsed = storage.get('elapsed', [0,0,0,0,0,0])

def cumsum_sma(array, period):
    ret = np.cumsum(array, dtype=float)
    ret[period:] = ret[period:] - ret[:-period]
    return ret[period - 1:] / period

def pandas_sma(array, period):
    return pd.rolling_mean(array, period)

def api_sma(array, period):
    # this method is native to Tradewave and does NOT return an array
    return (data[PAIR].ma(PERIOD))

def talib_sma(array, period):
    return ta.MA(array, period)

def convolve_sma(array, period):
    return np.convolve(array, np.ones((period,))/period, mode='valid')

def fftconvolve_sma(array, period):    
    return scipy.signal.fftconvolve(
        array, np.ones((period,))/period, mode='valid')    

def tick():

    close = data[PAIR].warmup_period('close')

    t1 = t.time()
    sma_api = api_sma(close, PERIOD)
    t2 = t.time()
    sma_cumsum = cumsum_sma(close, PERIOD)
    t3 = t.time()
    sma_pandas = pandas_sma(close, PERIOD)
    t4 = t.time()
    sma_talib = talib_sma(close, PERIOD)
    t5 = t.time()
    sma_convolve = convolve_sma(close, PERIOD)
    t6 = t.time()
    sma_fftconvolve = fftconvolve_sma(close, PERIOD)
    t7 = t.time()

    storage.elapsed[-1] = storage.elapsed[-1] + t2-t1
    storage.elapsed[-2] = storage.elapsed[-2] + t3-t2
    storage.elapsed[-3] = storage.elapsed[-3] + t4-t3
    storage.elapsed[-4] = storage.elapsed[-4] + t5-t4
    storage.elapsed[-5] = storage.elapsed[-5] + t6-t5    
    storage.elapsed[-6] = storage.elapsed[-6] + t7-t6        

    plot('sma_api', sma_api)  
    plot('sma_cumsum', sma_cumsum[-5])
    plot('sma_pandas', sma_pandas[-10])
    plot('sma_talib', sma_talib[-15])
    plot('sma_convolve', sma_convolve[-20])    
    plot('sma_fftconvolve', sma_fftconvolve[-25])

def stop():

    log('ticks....: %s' % info.max_ticks)

    log('api......: %.5f' % storage.elapsed[-1])
    log('cumsum...: %.5f' % storage.elapsed[-2])
    log('pandas...: %.5f' % storage.elapsed[-3])
    log('talib....: %.5f' % storage.elapsed[-4])
    log('convolve.: %.5f' % storage.elapsed[-5])    
    log('fft......: %.5f' % storage.elapsed[-6])

результаты:

[2015-01-31 23:00:00] ticks....: 744
[2015-01-31 23:00:00] api......: 0.16445
[2015-01-31 23:00:00] cumsum...: 0.03189
[2015-01-31 23:00:00] pandas...: 0.03677
[2015-01-31 23:00:00] talib....: 0.00700  # <<< Winner!
[2015-01-31 23:00:00] convolve.: 0.04871
[2015-01-31 23:00:00] fft......: 0.22306

введите описание изображения здесь

Ответ 7

Готовое решение см. на странице https://scipy-cookbook.readthedocs.io/items/SignalSmooth.html. Это обеспечивает скользящее среднее с типом flat окна. Обратите внимание, что это немного сложнее, чем простой метод "сделай сам", так как он пытается решить проблемы в начале и конце данных, отражая их (что может или не может работать в вашем случае...).

Для начала вы можете попробовать:

a = np.random.random(100)
plt.plot(a)
b = smooth(a, window='flat')
plt.plot(b)

Ответ 8

Я знаю, что это старый вопрос, но здесь есть решение, которое не использует никаких дополнительных структур данных или библиотек. Он является линейным по количеству элементов входного списка, и я не могу придумать какой-либо другой способ сделать его более эффективным (на самом деле, если кто-то знает, как лучше распределить результат, пожалуйста, дайте мне знать).

ПРИМЕЧАНИЕ: это было бы намного быстрее, если бы вместо списка использовался пустой массив, но я хотел устранить все зависимости. Также было бы возможно улучшить производительность многопоточным выполнением

Функция предполагает, что список ввода является одномерным, поэтому будьте осторожны.

### Running mean/Moving average
def running_mean(l, N):
    sum = 0
    result = list( 0 for x in l)

    for i in range( 0, N ):
        sum = sum + l[i]
        result[i] = sum / (i+1)

    for i in range( N, len(l) ):
        sum = sum - l[i-N] + l[i]
        result[i] = sum / N

    return result
Пример

Example

Предположим, что у нас есть список data = [ 1, 2, 3, 4, 5, 6 ], для которого мы хотим вычислить скользящее среднее с периодом 3, и что вам также нужен выходной список того же размера, что и входной (что чаще всего бывает).

Первый элемент имеет индекс 0, поэтому скользящее среднее следует вычислять по элементам индекса -2, -1 и 0. Очевидно, у нас нет данных [-2] и данных [-1] (если вы не мы хотим использовать специальные граничные условия), поэтому мы предполагаем, что эти элементы равны 0. Это эквивалентно заполнению нулями списка, за исключением того, что мы на самом деле не дополняем его, просто следим за индексами, которые требуют заполнения (от 0 до N -1).

Итак, для первых N элементов мы просто продолжаем складывать элементы в аккумуляторе.

result[0] = (0 + 0 + 1) / 3  = 0.333    ==   (sum + 1) / 3
result[1] = (0 + 1 + 2) / 3  = 1        ==   (sum + 2) / 3
result[2] = (1 + 2 + 3) / 3  = 2        ==   (sum + 3) / 3

Из элементов N + 1 форварды простое накопление не работает. мы ожидаем result[3] = (2 + 3 + 4)/3 = 3, но это отличается от (sum + 4)/3 = 3.333.

Способ вычисления правильного значения состоит в том, чтобы вычесть data[0] = 1 из sum+4, что дает sum + 4 - 1 = 9.

Это происходит потому, что в настоящее время sum = data[0] + data[1] + data[2], но это также верно для каждого i >= N, потому что перед вычитанием sum равен data[i-N] + ... + data[i-2] + data[i-1].

Ответ 9

Если важно сохранить размеры входных данных (вместо того, чтобы ограничивать выходные данные 'valid' областью свертки), вы можете использовать scipy.ndimage.filters.uniform_filter1d:

import numpy as np
from scipy.ndimage.filters import uniform_filter1d
N = 1000
x = np.random.random(100000)
y = uniform_filter1d(x, size=N)

y.shape == x.shape
>>> True

uniform_filter1d позволяет несколькими способами обрабатывать границу, где 'reflect' является значением по умолчанию, но в моем случае я предпочел 'nearest'.

Это также довольно быстро (почти в 50 раз быстрее, чем np.convolve):

%timeit y1 = np.convolve(x, np.ones((N,))/N, mode='same')
100 loops, best of 3: 9.28 ms per loop

%timeit y2 = uniform_filter1d(x, size=N)
10000 loops, best of 3: 191 µs per loop

Ответ 10

Я еще не проверял, насколько это быстро, но вы можете попробовать:

from collections import deque

cache = deque() # keep track of seen values
n = 10          # window size
A = xrange(100) # some dummy iterable
cum_sum = 0     # initialize cumulative sum

for t, val in enumerate(A, 1):
    cache.append(val)
    cum_sum += val
    if t < n:
        avg = cum_sum / float(t)
    else:                           # if window is saturated,
        cum_sum -= cache.popleft()  # subtract oldest value
        avg = cum_sum / float(n)

Ответ 11

Я чувствую, что это можно элегантно решить, используя узкое место

Смотрите базовый пример ниже:

import numpy as np
import bottleneck as bn

a = np.random.randint(4, 1000, size=100)
mm = bn.move_mean(a, window=5, min_count=1)
  • "мм" - это скользящее среднее для "а".

  • "Окно" - это максимальное количество записей, которые нужно учитывать для скользящего среднего.

  • "min_count" - это минимальное количество записей, которое нужно учитывать для скользящего среднего (например, для первых нескольких элементов или если массив имеет значения nan).

Хорошая часть заключается в том, что "Узкое место" помогает справиться со значениями наночастиц, а также очень эффективно.

Ответ 12

Немного поздно для вечеринки, но я сделал свою собственную небольшую функцию, которая НЕ обматывает концы или колодки нулями, которые затем используются для поиска среднего. В качестве дальнейшего рассмотрения следует, что он также повторно отображает сигнал в линейно разнесенных точках. Настройте код по своему желанию, чтобы получить другие функции.

Метод представляет собой простое матричное умножение с нормированным гауссовым ядром.

def running_mean(y_in, x_in, N_out=101, sigma=1):
    '''
    Returns running mean as a Bell-curve weighted average at evenly spaced
    points. Does NOT wrap signal around, or pad with zeros.

    Arguments:
    y_in -- y values, the values to be smoothed and re-sampled
    x_in -- x values for array

    Keyword arguments:
    N_out -- NoOf elements in resampled array.
    sigma -- 'Width' of Bell-curve in units of param x .
    '''
    N_in = size(y_in)

    # Gaussian kernel
    x_out = np.linspace(np.min(x_in), np.max(x_in), N_out)
    x_in_mesh, x_out_mesh = np.meshgrid(x_in, x_out)
    gauss_kernel = np.exp(-np.square(x_in_mesh - x_out_mesh) / (2 * sigma**2))
    # Normalize kernel, such that the sum is one along axis 1
    normalization = np.tile(np.reshape(sum(gauss_kernel, axis=1), (N_out, 1)), (1, N_in))
    gauss_kernel_normalized = gauss_kernel / normalization
    # Perform running average as a linear operation
    y_out = gauss_kernel_normalized @ y_in

    return y_out, x_out

Простое использование синусоидального сигнала с добавленным нормальным распределенным шумом: введите описание изображения здесь

Ответ 13

Вместо numpy или scipy я бы рекомендовал пандам сделать это быстрее:

df['data'].rolling(3).mean()

Это принимает скользящее среднее (MA) из 3 периодов столбца "данные". Вы также можете рассчитать сдвинутые версии, например, тот, который исключает текущую ячейку (сдвинутую назад), можно легко вычислить как:

df['data'].shift(periods=1).rolling(3).mean()

Ответ 14

Другой подход для поиска скользящей средней без использования numpy, panda

import itertools
sample = [2, 6, 10, 8, 11, 10]
list(itertools.starmap(lambda a,b: b/a, 
               enumerate(itertools.accumulate(sample), 1)))

будет печатать [2.0, 4.0, 6.0, 6.5, 7.4, 7.833333333333333]

Ответ 15

Этот вопрос сейчас даже старше, чем когда NeXuS написал об этом в прошлом месяце, НО мне нравится, как его код имеет дело с крайними случаями. Однако, поскольку это "простая скользящая средняя", ее результаты отстают от данных, к которым они относятся. Я думал, что рассмотрение случаев кросс более удовлетворительным образом, чем режимы NumPy valid, same и full, может быть достигнуто путем применения аналогичного подхода к методу convolution().

В моем вкладе используется центральное среднее значение для выравнивания результатов с их данными. Если для полноразмерного окна используется слишком мало очков, средние значения вычисляются из последовательно меньших окон по краям массива. [На самом деле, из последовательно больших окон, но это подробности реализации.]

import numpy as np

def running_mean(l, N):
    # Also works for the(strictly invalid) cases when N is even.
    if (N//2)*2 == N:
        N = N - 1
    front = np.zeros(N//2)
    back = np.zeros(N//2)

    for i in range(1, (N//2)*2, 2):
        front[i//2] = np.convolve(l[:i], np.ones((i,))/i, mode = 'valid')
    for i in range(1, (N//2)*2, 2):
        back[i//2] = np.convolve(l[-i:], np.ones((i,))/i, mode = 'valid')
    return np.concatenate([front, np.convolve(l, np.ones((N,))/N, mode = 'valid'), back[::-1]])

Он относительно медленный, потому что он использует convolve(), и, вероятно, может быть довольно сильно подтянут истинной Pythonista, однако я считаю, что идея стоит.

Ответ 16

Этот ответ содержит решения, использующие стандартную библиотеку Python для трех различных сценариев.


Скользящее среднее с itertools.accumulate

Это эффективное решение для Python 3. 2+, которое вычисляет скользящее среднее по итерируемым значениям, используя itertools.accumulate.

>>> from itertools import accumulate
>>> values = range(100)

Обратите внимание, что values могут быть любыми итерируемыми, включая генераторы или любой другой объект, который создает значения на лету.

Во-первых, лениво построить совокупную сумму значений.

>>> cumu_sum = accumulate(value_stream)

Затем enumerate накопленную сумму (начиная с 1) и создайте генератор, который выдает долю накопленных значений и текущий индекс перечисления.

>>> rolling_avg = (accu/i for i, accu in enumerate(cumu_sum, 1))

Вы можете выдать means = list(rolling_avg) если вам нужно все значения в памяти сразу или вызывать next постепенно.
(Конечно, вы также можете перебирать rolling_avg с помощью цикла for, который будет вызываться next.)

>>> next(rolling_avg) # 0/1
>>> 0.0
>>> next(rolling_avg) # (0 + 1)/2
>>> 0.5
>>> next(rolling_avg) # (0 + 1 + 2)/3
>>> 1.0

Это решение можно записать в виде функции следующим образом.

from itertools import accumulate

def rolling_avg(iterable):
    cumu_sum = accumulate(iterable)
    yield from (accu/i for i, accu in enumerate(cumu_sum, 1))

Сопрограмма, в которую вы можете отправлять значения в любое время

Эта сопрограмма использует значения, которые вы ей отправляете, и сохраняет скользящее среднее от значений, которые вы видели до сих пор.

Это полезно, когда у вас нет повторяющихся значений, но вы хотите получить значения, которые будут усредняться по одному в разное время на протяжении всей жизни ваших программ.

def rolling_avg_coro():
    i = 0
    total = 0.0
    avg = None

    while True:
        next_value = yield avg
        i += 1
        total += next_value
        avg = total/i

Сопрограмма работает так:

>>> averager = rolling_avg_coro() # instantiate coroutine
>>> next(averager) # get coroutine going (this is called priming)
>>>
>>> averager.send(5) # 5/1
>>> 5.0
>>> averager.send(3) # (5 + 3)/2
>>> 4.0
>>> print('doing something else...')
doing something else...
>>> averager.send(13) # (5 + 3 + 13)/3
>>> 7.0

Вычисление среднего по скользящему окну размера N

Эта функция-генератор принимает итеративный размер окна N и выдает среднее значение по текущим значениям внутри окна. Он использует deque, структуру данных, похожую на список, но оптимизированную для быстрых изменений (pop, append) на обеих конечных точках.

from collections import deque
from itertools import islice

def sliding_avg(iterable, N):        
    it = iter(iterable)
    window = deque(islice(it, N))        
    num_vals = len(window)

    if num_vals < N:
        msg = 'window size {} exceeds total number of values {}'
        raise ValueError(msg.format(N, num_vals))

    N = float(N) # force floating point division if using Python 2
    s = sum(window)

    while True:
        yield s/N
        try:
            nxt = next(it)
        except StopIteration:
            break
        s = s - window.popleft() + nxt
        window.append(nxt)

Вот функция в действии:

>>> values = range(100)
>>> N = 5
>>> window_avg = sliding_avg(values, N)
>>> 
>>> next(window_avg) # (0 + 1 + 2 + 3 + 4)/5
>>> 2.0
>>> next(window_avg) # (1 + 2 + 3 + 4 + 5)/5
>>> 3.0
>>> next(window_avg) # (2 + 3 + 4 + 5 + 6)/5
>>> 4.0

Ответ 17

Есть много ответов выше о вычислении среднего пробега. Мой ответ добавляет две дополнительные функции:

  1. игнорирует значения nan
  2. вычисляет среднее значение для N соседних значений, не включая значение самого интереса

Эта вторая особенность особенно полезна для определения того, какие значения отличаются от общей тенденции на определенную величину.

Я использую numpy.cumsum, так как это самый эффективный с точки зрения времени метод (см. Выше Alleo).

N=10 # number of points to test on each side of point of interest, best if even
padded_x = np.insert(np.insert( np.insert(x, len(x), np.empty(int(N/2))*np.nan), 0, np.empty(int(N/2))*np.nan ),0,0)
n_nan = np.cumsum(np.isnan(padded_x))
cumsum = np.nancumsum(padded_x) 
window_sum = cumsum[N+1:] - cumsum[:-(N+1)] - x # subtract value of interest from sum of all values within window
window_n_nan = n_nan[N+1:] - n_nan[:-(N+1)] - np.isnan(x)
window_n_values = (N - window_n_nan)
movavg = (window_sum) / (window_n_values)

Этот код работает только для Ns. Его можно настроить для нечетных чисел, изменив np.insert padded_x и n_nan.

Пример вывода (черно-белый, movavg в синем): raw data (black) and moving average (blue) of 10 points around each value, not including that value. nan values are ignored.

Этот код может быть легко адаптирован для удаления всех значений скользящей средней, рассчитанных с меньшим, чем обрезание = 3 значения не-нано.

window_n_values = (N - window_n_nan).astype(float) # dtype must be float to set some values to nan
cutoff = 3
window_n_values[window_n_values<cutoff] = np.nan
movavg = (window_sum) / (window_n_values)

raw data (black) and moving average (blue) while ignoring any window with fewer than 3 non-nan values

Ответ 18

В одном из ответов выше есть комментарий mab, который имеет этот метод. bottleneck имеет move_mean который является простым скользящим средним:

import numpy as np
import bottleneck as bn

a = np.arange(10) + np.random.random(10)

mva = bn.move_mean(a, window=2, min_count=1)

min_count - удобный параметр, который в основном будет принимать скользящее среднее до этой точки в вашем массиве. Если вы не установите min_count, он будет равен window, и все до window точек будет nan.

Ответ 19

Хотя здесь есть решения для этого вопроса, пожалуйста, взгляните на мое решение. Это очень просто и хорошо работает.

import numpy as np
dataset = np.asarray([1, 2, 3, 4, 5, 6, 7])
ma = list()
window = 3
for t in range(0, len(dataset)):
    if t+window <= len(dataset):
        indices = range(t, t+window)
        ma.append(np.average(np.take(dataset, indices)))
else:
    ma = np.asarray(ma)

Ответ 20

Использовать только стандартную библиотеку Python (эффективная память)

Просто дайте другую версию использования только стандартной библиотеки deque. Меня удивляет, что большинство ответов используют pandas или numpy.

def moving_average(iterable, n=3):
    d = deque(maxlen=n)
    for i in iterable:
        d.append(i)
        if len(d) == n:
            yield sum(d)/n

r = moving_average([40, 30, 50, 46, 39, 44])
assert list(r) == [40.0, 42.0, 45.0, 43.0]

На самом деле я нашел другую реализацию в документации по Python

def moving_average(iterable, n=3):
    # moving_average([40, 30, 50, 46, 39, 44]) --> 40.0 42.0 45.0 43.0
    # http://en.wikipedia.org/wiki/Moving_average
    it = iter(iterable)
    d = deque(itertools.islice(it, n-1))
    d.appendleft(0)
    s = sum(d)
    for elem in it:
        s += elem - d.popleft()
        d.append(elem)
        yield s / n

Однако реализация кажется мне немного более сложной, чем должна быть. Но по какой-то причине это должно быть в стандартных документах на Python, может кто-нибудь прокомментировать реализацию моей и стандартной документации?

Ответ 21

Из чтения других ответов я не думаю, что это вопрос, о котором идет речь, но я пришел сюда с необходимостью поддержания среднего значения списка значений, которые росли в размерах.

Поэтому, если вы хотите сохранить список значений, которые вы приобретаете где-то (сайт, измерительное устройство и т.д.), А среднее значение последних n обновленных значений, вы можете использовать приведенный ниже код, который минимизирует усилие добавления новые элементы:

class Running_Average(object):
    def __init__(self, buffer_size=10):
        """
        Create a new Running_Average object.

        This object allows the efficient calculation of the average of the last
        'buffer_size' numbers added to it.

        Examples
        --------
        >>> a = Running_Average(2)
        >>> a.add(1)
        >>> a.get()
        1.0
        >>> a.add(1)  # there are two 1 in buffer
        >>> a.get()
        1.0
        >>> a.add(2)  # there a 1 and a 2 in the buffer
        >>> a.get()
        1.5
        >>> a.add(2)
        >>> a.get()  # now there only two 2 in the buffer
        2.0
        """
        self._buffer_size = int(buffer_size)  # make sure it an int
        self.reset()

    def add(self, new):
        """
        Add a new number to the buffer, or replaces the oldest one there.
        """
        new = float(new)  # make sure it a float
        n = len(self._buffer)
        if n < self.buffer_size:  # still have to had numbers to the buffer.
            self._buffer.append(new)
            if self._average != self._average:  # ~ if isNaN().
                self._average = new  # no previous numbers, so it new.
            else:
                self._average *= n  # so it only the sum of numbers.
                self._average += new  # add new number.
                self._average /= (n+1)  # divide by new number of numbers.
        else:  # buffer full, replace oldest value.
            old = self._buffer[self._index]  # the previous oldest number.
            self._buffer[self._index] = new  # replace with new one.
            self._index += 1  # update the index and make sure it's...
            self._index %= self.buffer_size  # ... smaller than buffer_size.
            self._average -= old/self.buffer_size  # remove old one...
            self._average += new/self.buffer_size  # ...and add new one...
            # ... weighted by the number of elements.

    def __call__(self):
        """
        Return the moving average value, for the lazy ones who don't want
        to write .get .
        """
        return self._average

    def get(self):
        """
        Return the moving average value.
        """
        return self()

    def reset(self):
        """
        Reset the moving average.

        If for some reason you don't want to just create a new one.
        """
        self._buffer = []  # could use np.empty(self.buffer_size)...
        self._index = 0  # and use this to keep track of how many numbers.
        self._average = float('nan')  # could use np.NaN .

    def get_buffer_size(self):
        """
        Return current buffer_size.
        """
        return self._buffer_size

    def set_buffer_size(self, buffer_size):
        """
        >>> a = Running_Average(10)
        >>> for i in range(15):
        ...     a.add(i)
        ...
        >>> a()
        9.5
        >>> a._buffer  # should not access this!!
        [10.0, 11.0, 12.0, 13.0, 14.0, 5.0, 6.0, 7.0, 8.0, 9.0]

        Decreasing buffer size:
        >>> a.buffer_size = 6
        >>> a._buffer  # should not access this!!
        [9.0, 10.0, 11.0, 12.0, 13.0, 14.0]
        >>> a.buffer_size = 2
        >>> a._buffer
        [13.0, 14.0]

        Increasing buffer size:
        >>> a.buffer_size = 5
        Warning: no older data available!
        >>> a._buffer
        [13.0, 14.0]

        Keeping buffer size:
        >>> a = Running_Average(10)
        >>> for i in range(15):
        ...     a.add(i)
        ...
        >>> a()
        9.5
        >>> a._buffer  # should not access this!!
        [10.0, 11.0, 12.0, 13.0, 14.0, 5.0, 6.0, 7.0, 8.0, 9.0]
        >>> a.buffer_size = 10  # reorders buffer!
        >>> a._buffer
        [5.0, 6.0, 7.0, 8.0, 9.0, 10.0, 11.0, 12.0, 13.0, 14.0]
        """
        buffer_size = int(buffer_size)
        # order the buffer so index is zero again:
        new_buffer = self._buffer[self._index:]
        new_buffer.extend(self._buffer[:self._index])
        self._index = 0
        if self._buffer_size < buffer_size:
            print('Warning: no older data available!')  # should use Warnings!
        else:
            diff = self._buffer_size - buffer_size
            print(diff)
            new_buffer = new_buffer[diff:]
        self._buffer_size = buffer_size
        self._buffer = new_buffer

    buffer_size = property(get_buffer_size, set_buffer_size)

И вы можете протестировать его, например:

def graph_test(N=200):
    import matplotlib.pyplot as plt
    values = list(range(N))
    values_average_calculator = Running_Average(N/2)
    values_averages = []
    for value in values:
        values_average_calculator.add(value)
        values_averages.append(values_average_calculator())
    fig, ax = plt.subplots(1, 1)
    ax.plot(values, label='values')
    ax.plot(values_averages, label='averages')
    ax.grid()
    ax.set_xlim(0, N)
    ax.set_ylim(0, N)
    fig.show()

Который дает:

Values and their average as a function of values #

Ответ 22

Как насчет фильтра скользящей средней? Это также однострочный и имеет то преимущество, что вы можете легко манипулировать типом окна, если вам нужно что-то еще, чем прямоугольник, т.е. N-длинное простое скользящее среднее массива a:

lfilter(np.ones(N)/N, [1], a)[N:]

И при использовании треугольного окна:

lfilter(np.ones(N)*scipy.signal.triang(N)/N, [1], a)[N:]

Ответ 23

Другое решение, использующее стандартную библиотеку и deque:

from collections import deque
import itertools

def moving_average(iterable, n=3):
    # http://en.wikipedia.org/wiki/Moving_average
    it = iter(iterable) 
    # create an iterable object from input argument
    d = deque(itertools.islice(it, n-1))  
    # create deque object by slicing iterable
    d.appendleft(0)
    s = sum(d)
    for elem in it:
        s += elem - d.popleft()
        d.append(elem)
        yield s / n

# example on how to use it
for i in  moving_average([40, 30, 50, 46, 39, 44]):
    print(i)

# 40.0
# 42.0
# 45.0
# 43.0

Ответ 24

В образовательных целях позвольте мне добавить еще два решения Numpy (которые работают медленнее, чем решение cumsum):

import numpy as np
from numpy.lib.stride_tricks import as_strided

def ra_strides(arr, window):
    ''' Running average using as_strided'''
    n = arr.shape[0] - window + 1
    arr_strided = as_strided(arr, shape=[n, window], strides=2*arr.strides)
    return arr_strided.mean(axis=1)

def ra_add(arr, window):
    ''' Running average using add.reduceat'''
    n = arr.shape[0] - window + 1
    indices = np.array([0, window]*n) + np.repeat(np.arange(n), 2)
    arr = np.append(arr, 0)
    return np.add.reduceat(arr, indices )[::2]/window

Используемые функции: as_strided, add.reduceat

Ответ 25

Если вы предпочитаете откатываться самостоятельно, а не использовать существующую библиотеку, будьте внимательны к ошибке с плавающей запятой и старайтесь минимизировать ее последствия:

class SumAccumulator:
    def __init__(self):
        self.values = [0]
        self.count = 0

    def add( self, val ):
        self.values.append( val )
        self.count = self.count + 1
        i = self.count
        while i & 0x01:
            i = i >> 1
            v0 = self.values.pop()
            v1 = self.values.pop()
            self.values.append( v0 + v1 )

    def get_total(self):
        return sum( reversed(self.values) )

    def get_size( self ):
        return self.count

Если все ваши значения примерно одинакового порядка, то это поможет сохранить точность, всегда добавляя значения примерно одинаковых величин.