Подтвердить что ты не робот

Оценка малого временного сдвига между двумя временными рядами

У меня есть два временных ряда, и я подозреваю, что между ними есть сдвиг во времени, и я хочу оценить этот сдвиг во времени.

Этот вопрос задан раньше: Найти разность фаз между двумя (волновыми) волнами и найти временной сдвиг между двумя подобными формами колебаний, но в моем случае, временной сдвиг меньше разрешения данных. например, данные доступны с часовым разрешением, а временной сдвиг - всего несколько минут (см. изображение).

Причиной этого является то, что регистратор данных, используемый для измерения одной из серий, имеет мало минут в своем времени.

Существуют ли какие-либо алгоритмы, которые могут оценить этот сдвиг, предпочтительно без использования интерполяции?

solar irradiation forecast and solar irradiation measurement

4b9b3361

Ответ 1

Это довольно интересная проблема. Здесь попытка частичного решения с использованием преобразований Фурье. Это зависит от того, что данные являются умеренно-периодическими. Я не уверен, что он будет работать с вашими данными (где производные в конечных точках, похоже, не совпадают).

import numpy as np

X = np.linspace(0,2*np.pi,30)  #some X values

def yvals(x):
    return np.sin(x)+np.sin(2*x)+np.sin(3*x)

Y1 = yvals(X)
Y2 = yvals(X-0.1)  #shifted y values

#fourier transform both series
FT1 = np.fft.fft(Y1)
FT2 = np.fft.fft(Y2)

#You can show that analyically, a phase shift in the coefficients leads to a 
#multiplicative factor of `exp(-1.j * N * T_d)`

#can't take the 0'th element because that a division by 0.  Analytically, 
#the division by 0 is OK by L'hopital's<sp?> rule, but computers don't know calculus :)
print np.log(FT2[1:]/FT1[1:])/(-1.j*np.arange(1,len(X)))

Быстрый осмотр печатного выхода показывает, что частоты с наибольшим (N = 1, N = 2) дают разумные оценки, N = 3 тоже хорошо, если вы посмотрите на абсолютное значение (np.absolute), хотя я затрудняюсь объяснить, почему это было бы.

Может быть, кто-то, более знакомый с математикой, может взять это отсюда, чтобы дать лучший ответ...

Ответ 2

Одна из ссылок, которые вы предоставили, имеет правильную идею (на самом деле я делаю здесь почти то же самое)

import numpy as np
import matplotlib.pyplot as plt
from scipy.signal import correlate

a,b, N = 0, 10, 1000        #Boundaries, datapoints
shift = -3                  #Shift, note 3/10 of L = b-a

x = np.linspace(a,b,N)
x1 = 1*x + shift
time = np.arange(1-N,N)     #Theoritical definition, time is centered at 0

y1 = sum([np.sin(2*np.pi*i*x/b) for i in range(1,5)])
y2 = sum([np.sin(2*np.pi*i*x1/b) for i in range(1,5)])

#Really only helps with large irregular data, try it
# y1 -= y1.mean()
# y2 -= y2.mean()
# y1 /= y1.std()
# y2 /= y2.std()

cross_correlation = correlate(y1,y2)
shift_calculated = time[cross_correlation.argmax()] *1.0* b/N
y3 = sum([np.sin(2*np.pi*i*(x1-shift_calculated)/b) for i in range(1,5)])
print "Preset shift: ", shift, "\nCalculated shift: ", shift_calculated



plt.plot(x,y1)
plt.plot(x,y2)
plt.plot(x,y3)
plt.legend(("Regular", "Shifted", "Recovered"))
plt.savefig("SO_timeshift.png")
plt.show()

Это имеет следующий результат:

Preset shift:  -3
Calculated shift:  -2.99

enter image description here

Возможно, потребуется проверить

Обратите внимание, что argmax() корреляции показывает положение выравнивания, его нужно масштабировать по длине b-a = 10-0 = 10 и N, чтобы получить фактическое значение.

Проверка источника корреляции Источник не совсем очевидно, что ведет себя импортированная функция от sigtools. Для больших наборов данных круговая корреляция (с помощью быстрых преобразований Фурье) намного быстрее, чем прямолинейный метод. Я подозреваю, что это то, что реализовано в sigtools, но я не могу точно сказать. Поиск файла в моей папке python2.7 возвращал только скомпилированный файл C pyd.

Ответ 3

Это очень интересная проблема. Первоначально я собирался предложить решение, основанное на взаимной корреляции, похожее на user948652. Однако из вашего описания проблемы есть два вопроса с этим решением:

  • Разрешение данных больше, чем временной сдвиг, и
  • В некоторые дни предсказанное значение и измеренные значения имеют очень низкую корреляцию друг с другом.

В результате этих двух проблем, я думаю, что прямое применение решения кросс-корреляции, вероятно, фактически увеличит ваш сдвиг во времени, особенно в дни, когда предсказанные и измеренные значения имеют очень низкую корреляцию друг с другом.

В моем комментарии выше, я спросил, были ли у вас какие-либо события, которые происходят в обоих временных рядах, и вы сказали, что нет. Однако, исходя из вашего домена, я думаю, что у вас на самом деле есть два:

  • Восход
  • Закат

Даже если остальная часть сигнала плохо коррелирует, восход и закат должны быть в некоторой степени коррелированы, так как они будут монотонно увеличиваться от/уменьшаться до базового уровня ночи. Таким образом, здесь потенциальное решение, основанное на этих двух событиях, должно как минимизировать необходимую интерполяцию, так и не зависеть от взаимной корреляции слабокоррелированных сигналов.

1. Найти приблизительный Sunrise/Sunset

Это должно быть достаточно простым, просто возьмите первую и последнюю точки данных, которые выше, чем плоская линия ночи, и назовите их приблизительным восходом и закатом. Затем я бы сосредоточился на этих данных, а также на пунктах сразу по обе стороны, т.е.:

width=1
sunrise_index = get_sunrise()
sunset_index = get_sunset()

# set the data to zero, except for the sunrise/sunset events.
bitmap = zeros(data.shape)
bitmap[sunrise_index - width : sunrise_index + width] = 1
bitmap[sunset_index - width : sunset_index + width] = 1
sunrise_sunset = data * bitmap 

Существует несколько способов реализации get_sunrise() и get_sunset() в зависимости от того, насколько строго вам необходим ваш анализ. Я бы использовал numpy.diff, пороговое значение для определенного значения и взять первую и последнюю точки над этим значением. Вы также можете считывать данные о ночном времени из большого количества файлов, вычислять среднее и стандартное отклонение и искать первую и последнюю точки данных, превышающие, скажем, 0.5 * st_dev ночные данные. Вы также можете выполнить какое-то сопоставление шаблонов на основе кластеров, в частности, если разные классы дня (т.е. Солнечные или частично облачные или очень облачные) имеют очень стереотипные события восхода/захода солнца.

2. Данные Resample

Я не думаю, что есть какой-либо способ решить эту проблему без некоторой интерполяции. Я бы использовал пересчет данных на более высокую частоту дискретизации, чем сдвиг. Если сдвиг находится в масштабе минут, то до 1 минуты или 30 секунд.

num_samples = new_sample_rate * sunrise_sunset.shape[0]
sunrise_sunset = scipy.signal.resample(sunrise_sunset, num_samples)

В качестве альтернативы мы могли бы использовать кубический сплайн для интерполяции данных (см. здесь).

3. Гауссова свертка

Так как существует некоторая интерполяция, то мы не знаем, как точно предсказал реальный восход и закат. Таким образом, мы можем связать сигнал с гауссой, чтобы представить эту неопределенность.

gaussian_window = scipy.signal.gaussian(M, std)
sunrise_sunset_g = scipy.signal.convolve(sunrise_sunset, gaussian_window)

4. Кросс-корреляция

Используйте метод взаимной корреляции в user948652, чтобы получить временной сдвиг.

В этом методе есть много неотвеченных вопросов, которые потребуют более тщательного изучения и экспериментирования с данными, например, как лучший метод для определения восхода/захода солнца, насколько широким будет гауссовское окно и т.д. Но как бы я начал атаковать эту проблему. Удачи!

Ответ 4

Оптимизация для лучшего решения

Для заданных ограничений, а именно, что решение сдвинуто по фазе на меньшую величину, меньшую, чем метод выборки, простой алгоритм простого спуска работает хорошо. Я изменил образец проблемы @mgilson, чтобы показать, как это сделать. Обратите внимание, что это решение является надежным, поскольку оно может обрабатывать шум.

Функция ошибки. Оптимизация может быть оптимизирована, но это работает на удивление хорошо:

np.sqrt((X1-X2+delta_x)**2+(Y1-Y2)**2).sum()

То есть, минимизируйте евклидово расстояние между двумя кривыми, только регулируя ось x (фаза).

import numpy as np

def yvals(x):
    return np.sin(x)+np.sin(2*x)+np.sin(3*x)

dx = .1
unknown_shift = .03 * np.random.random() * dx

X1  = np.arange(0,2*np.pi,dx)  #some X values
X2  = X1 + unknown_shift

Y1 = yvals(X1)
Y2 = yvals(X2) # shifted Y
Y2 += .1*np.random.normal(size=X1.shape)  # now with noise

def err_func(p):
    return np.sqrt((X1-X2+p[0])**2+(Y1-Y2)**2).sum()

from scipy.optimize import fmin

p0 = [0,] # Inital guess of no shift
found_shift = fmin(err_func, p0)[0]

print "Unknown shift: ", unknown_shift
print "Found   shift: ", found_shift
print "Percent error: ", abs((unknown_shift-found_shift)/unknown_shift)

Пример прогона дает:

Optimization terminated successfully.
         Current function value: 4.804268
         Iterations: 6
         Function evaluations: 12
Unknown shift:  0.00134765446268
Found   shift:  0.001375
Percent error:  -0.0202912082305

Ответ 5

Я успешно использовал (в awgn-канале) подход подбора фильтра, который дает пиковую энергию m [n] при индексе n; затем подгоняем многочлен 2-й степени f (n) к m [n-1], m [n], m [n + 1] и находим минимум, полагая f '(n) == 0.

Ответ не обязательно абсолютно линейный, особенно если автокорреляция сигнала не обращается в нуль при m [n-1], m [n + 1].

Ответ 6

Действительно, интересная проблема, но пока не удовлетворительный ответ. Попробуйте изменить это...

Вы говорите, что предпочитаете не использовать интерполяцию, но, как я понимаю из вашего комментария, то, что вы на самом деле имеете в виду, это то, что вы хотели бы избежать повышения частоты дискретизации до более высокого разрешения. В базовом решении используется подход наименьших квадратов с линейной интерполяционной функцией, но без повышения дискретизации до более высокого разрешения:

import numpy as np
from scipy.interpolate import interp1d
from scipy.optimize import leastsq

def yvals(x):
    return np.sin(x)+np.sin(2*x)+np.sin(3*x)

dx = .1
X = np.arange(0,2*np.pi,dx)
Y = yvals(X)

unknown_shift = np.random.random() * dx
Y_shifted = yvals(X + unknown_shift)

def err_func(p):
    return interp1d(X,Y)(X[1:-1]+p[0]) - Y_shifted[1:-1]

p0 = [0,] # Inital guess of no shift
found_shift = leastsq(err_func,p0)[0][0]

print "Unknown shift: ", unknown_shift
print "Found   shift: ", found_shift

Пример прогона дает довольно точное решение:

Unknown shift:  0.0695701123582
Found   shift:  0.0696105501967

Если в сдвинутый Y включен шум:

Y_shifted += .1*np.random.normal(size=X.shape)

Получают несколько менее точные результаты:

Unknown shift:  0.0695701123582
Found   shift:  0.0746643381744

Точность при наличии шума улучшается, когда доступно больше данных, например. с:

X = np.arange(0,200*np.pi,dx)

Типичный результат:

Unknown shift:  0.0695701123582
Found   shift:  0.0698527939193