Подтвердить что ты не робот

Быстрая линейная интерполяция в Numpy/Scipy "вдоль пути"

Скажем, что у меня есть данные от метеостанций на 3 (известных) высотах на горе. В частности, каждая станция регистрирует измерение температуры на своем месте каждую минуту. У меня есть два типа интерполяции, которые я хотел бы выполнить. И я хотел бы иметь возможность выполнять каждый быстро.

Итак, давайте настроим некоторые данные:

import numpy as np
from scipy.interpolate import interp1d
import pandas as pd
import seaborn as sns

np.random.seed(0)
N, sigma = 1000., 5

basetemps = 70 + (np.random.randn(N) * sigma)
midtemps = 50 + (np.random.randn(N) * sigma)
toptemps = 40 + (np.random.randn(N) * sigma)
alltemps = np.array([basetemps, midtemps, toptemps]).T # note transpose!
trend = np.sin(4 / N * np.arange(N)) * 30
trend = trend[:, np.newaxis]

altitudes = np.array([500, 1500, 4000]).astype(float)

finaltemps = pd.DataFrame(alltemps + trend, columns=altitudes)
finaltemps.index.names, finaltemps.columns.names = ['Time'], ['Altitude']
finaltemps.plot()

Отлично, поэтому наши температуры выглядят так: Данные о сырой температуре

Интерполируйте все времена на одну и ту же высоту:

Я думаю, что это довольно просто. Скажем, я хочу получить температуру на высоте 1000 для каждого раза. Я могу просто использовать встроенные методы интерполяции scipy:

interping_function = interp1d(altitudes, finaltemps.values)
interped_to_1000 = interping_function(1000)

fig, ax = plt.subplots(1, 1, figsize=(8, 5))
finaltemps.plot(ax=ax, alpha=0.15)
ax.plot(interped_to_1000, label='Interped')
ax.legend(loc='best', title=finaltemps.columns.name)

Температура со статической интерпретацией

Это хорошо работает. И посмотрим на скорость:

%%timeit
res = interp1d(altitudes, finaltemps.values)(1000)
#-> 1000 loops, best of 3: 207 µs per loop

Интерполировать "вдоль пути":

Итак, теперь у меня есть вторая, связанная с этим проблема. Скажем, я знаю высоту пешеходной вечеринки как функцию времени, и я хочу вычислить температуру в своем (движущемся) месте, линейно интерполируя мои данные во времени. В частности, времена, когда я знаю, что местонахождение походной вечеринки - это те же самые, когда я знаю температуру на своих метеостанциях. Я могу сделать это без особых усилий:

location = np.linspace(altitudes[0], altitudes[-1], N)
interped_along_path = np.array([interp1d(altitudes, finaltemps.values[i, :])(loc) 
                                             for i, loc in enumerate(location)])

fig, ax = plt.subplots(1, 1, figsize=(8, 5))
finaltemps.plot(ax=ax, alpha=0.15)
ax.plot(interped_along_path, label='Interped')
ax.legend(loc='best', title=finaltemps.columns.name)

Температура с перемещением

Итак, это работает очень хорошо, но важно отметить, что ключевая строка выше использует понимание списка, чтобы скрыть огромный объем работы. В предыдущем случае scipy создает для нас одну интерполяционную функцию и оценивает ее один раз на большом количестве данных. В этом случае scipy фактически создает N отдельные интерполяционные функции и каждый раз оценивает небольшой объем данных. Это по своей сути неэффективно. Здесь существует замкнутая петля (в понимании списка), и, кроме того, это просто кажется дряблым.

Неудивительно, что это намного медленнее, чем предыдущий случай:

%%timeit
res = np.array([interp1d(altitudes, finaltemps.values[i, :])(loc) 
                            for i, loc in enumerate(location)])
#-> 10 loops, best of 3: 145 ms per loop

Итак, второй пример работает на 1000 медленнее первого. То есть в соответствии с идеей о том, что тяжелый подъем является шагом "сделать линейную интерполяционную функцию"... который происходит во втором примере в 1000 раз, но только один раз в первом.

Итак, вопрос: есть ли лучший способ приблизиться ко второй проблеме? Например, есть ли хороший способ настроить его с помощью 2-мерной интерполяции (которая, возможно, могла бы справиться с этим случаем где время, в котором известны местоположения пешеходной зоны, - это не время, в которое были выбраны температуры)? Или есть особенно гладкий способ справиться с ситуациями здесь, где время выстраивается в линию? Или другое?

4b9b3361

Ответ 1

В течение фиксированного момента времени вы можете использовать следующую функцию интерполяции:

g(a) = cc[0]*abs(a-aa[0]) + cc[1]*abs(a-aa[1]) + cc[2]*abs(a-aa[2])

где a - высота туриста, aa вектор с 3 измерениями altitudes и cc является вектором с коэффициентами. Следует отметить три момента:

  • Для заданных температур (alltemps), соответствующих aa, определение cc может быть выполнено путем решения линейного матричного уравнения с использованием np.linalg.solve().
  • g(a) легко векторизовать для (N,) мерных a и (N, 3) мерных cc (включая np.linalg.solve() соответственно).
  • g(a) называется одномерным ядром сплайна первого порядка (для трех точек). Использование abs(a-aa[i])**(2*d-1) изменит порядок сплайнов на d. Этот подход можно интерпретировать как упрощенную версию гауссовского процесса в машинное обучение.

Таким образом, код будет выглядеть следующим образом:

import matplotlib.pyplot as plt
import numpy as np
import seaborn as sns

# generate temperatures
np.random.seed(0)
N, sigma = 1000, 5
trend = np.sin(4 / N * np.arange(N)) * 30
alltemps = np.array([tmp0 + trend + sigma*np.random.randn(N)
                     for tmp0 in [70, 50, 40]])

# generate attitudes:
altitudes = np.array([500, 1500, 4000]).astype(float)
location = np.linspace(altitudes[0], altitudes[-1], N)


def doit():
    """ do the interpolation, improved version for speed """
    AA = np.vstack([np.abs(altitudes-a_i) for a_i in altitudes])
    # This is slighty faster than np.linalg.solve(), because AA is small:
    cc = np.dot(np.linalg.inv(AA), alltemps)

    return (cc[0]*np.abs(location-altitudes[0]) +
            cc[1]*np.abs(location-altitudes[1]) +
            cc[2]*np.abs(location-altitudes[2]))


t_loc = doit()  # call interpolator

# do the plotting:
fg, ax = plt.subplots(num=1)
for alt, t in zip(altitudes, alltemps):
    ax.plot(t, label="%d feet" % alt, alpha=.5)
ax.plot(t_loc, label="Interpolation")
ax.legend(loc="best", title="Altitude:")
ax.set_xlabel("Time")
ax.set_ylabel("Temperature")
fg.canvas.draw()

Измерение времени дает:

In [2]: %timeit doit()
10000 loops, best of 3: 107 µs per loop

Обновление: Я заменил исходные списки в doit() для импорта скорости на 30% (для N=1000).

Кроме того, в соответствии с запросом на сравнение, блок кода теста @moarningsun на моей машине:

10 loops, best of 3: 110 ms per loop  
interp_checked
10000 loops, best of 3: 83.9 µs per loop
scipy_interpn
1000 loops, best of 3: 678 µs per loop
Output allclose:
[True, True, True]

Обратите внимание, что N=1000 - относительно небольшое число. Использование N=100000 дает результаты:

interp_checked
100 loops, best of 3: 8.37 ms per loop

%timeit doit()
100 loops, best of 3: 5.31 ms per loop

Это показывает, что этот подход лучше масштабируется для больших N, чем подход interp_checked.

Ответ 2

Линейная интерполяция между двумя значениями y1, y2 в местах x1 и x2 относительно точки xi проста:

yi = y1 + (y2-y1) * (xi-x1) / (x2-x1)

С помощью некоторых векторных выражений Numpy мы можем выбрать соответствующие точки из набора данных и применить вышеуказанную функцию:

I = np.searchsorted(altitudes, location)

x1 = altitudes[I-1]
x2 = altitudes[I]

time = np.arange(len(alltemps))
y1 = alltemps[time,I-1]
y2 = alltemps[time,I]

xI = location

yI = y1 + (y2-y1) * (xI-x1) / (x2-x1)

Проблема состоит в том, что некоторые точки лежат на границах (или даже вне) известного диапазона, который следует учитывать:

I = np.searchsorted(altitudes, location)
same = (location == altitudes.take(I, mode='clip'))
out_of_range = ~same & ((I == 0) | (I == altitudes.size))
I[out_of_range] = 1  # Prevent index-errors

x1 = altitudes[I-1]
x2 = altitudes[I]

time = np.arange(len(alltemps))
y1 = alltemps[time,I-1]
y2 = alltemps[time,I]

xI = location

yI = y1 + (y2-y1) * (xI-x1) / (x2-x1)
yI[out_of_range] = np.nan

К счастью, Scipy уже предоставляет интерполяцию ND, которая также так же легко заботится о временах несоответствия, например:

from scipy.interpolate import interpn

time = np.arange(len(alltemps))

M = 150
hiketime = np.linspace(time[0], time[-1], M)
location = np.linspace(altitudes[0], altitudes[-1], M)
xI = np.column_stack((hiketime, location))

yI = interpn((time, altitudes), alltemps, xI)

Здесь тестовый код (без каких-либо pandas на самом деле, бит, который я включил решение из другого ответа):

import numpy as np
from scipy.interpolate import interp1d, interpn

def original():
    return np.array([interp1d(altitudes, alltemps[i, :])(loc)
                                for i, loc in enumerate(location)])

def OP_self_answer():
    return np.diagonal(interp1d(altitudes, alltemps)(location))

def interp_checked():
    I = np.searchsorted(altitudes, location)
    same = (location == altitudes.take(I, mode='clip'))
    out_of_range = ~same & ((I == 0) | (I == altitudes.size))
    I[out_of_range] = 1  # Prevent index-errors

    x1 = altitudes[I-1]
    x2 = altitudes[I]

    time = np.arange(len(alltemps))
    y1 = alltemps[time,I-1]
    y2 = alltemps[time,I]

    xI = location

    yI = y1 + (y2-y1) * (xI-x1) / (x2-x1)
    yI[out_of_range] = np.nan

    return yI

def scipy_interpn():
    time = np.arange(len(alltemps))
    xI = np.column_stack((time, location))
    yI = interpn((time, altitudes), alltemps, xI)
    return yI

N, sigma = 1000., 5

basetemps = 70 + (np.random.randn(N) * sigma)
midtemps = 50 + (np.random.randn(N) * sigma)
toptemps = 40 + (np.random.randn(N) * sigma)
trend = np.sin(4 / N * np.arange(N)) * 30
trend = trend[:, np.newaxis]
alltemps = np.array([basetemps, midtemps, toptemps]).T + trend
altitudes = np.array([500, 1500, 4000], dtype=float)
location = np.linspace(altitudes[0], altitudes[-1], N)

funcs = [original, interp_checked, scipy_interpn]
for func in funcs:
    print(func.func_name)
    %timeit func()

from itertools import combinations
outs = [func() for func in funcs]
print('Output allclose:')
print([np.allclose(out1, out2) for out1, out2 in combinations(outs, 2)])

В моей системе следующий результат:

original
10 loops, best of 3: 184 ms per loop
OP_self_answer
10 loops, best of 3: 89.3 ms per loop
interp_checked
1000 loops, best of 3: 224 µs per loop
scipy_interpn
1000 loops, best of 3: 1.36 ms per loop
Output allclose:
[True, True, True, True, True, True]

Scipy interpn страдает от скорости по сравнению с самым быстрым методом, но для него универсальность и простота использования - это, безусловно, путь.

Ответ 3

Я предлагаю один шаг вперед. Во втором случае (интерполяция "по пути" ) мы делаем много разных функций интерполяции. Одна вещь, которую мы могли бы попробовать, - сделать только одну функцию интерполяции (такую, которая делает интерполяцию в измерении высоты во все времена, как в первом случае выше) и оценивать эту функцию снова и снова (в векторном виде). Это даст нам больше данных, чем мы хотим (это даст нам 1000 х 1000 матриц вместо 1000-элементного вектора). Но тогда наш целевой результат будет просто по диагонали. Таким образом, вопрос заключается в том, вызывает ли вызов одной функции на пути более сложные аргументы, выполняемые быстрее, чем выполнение многих функций и вызов их с помощью простых аргументов?

Ответ да!

Ключом является то, что функция интерполяции, возвращаемая scipy.interpolate.interp1d, может принимать numpy.ndarray в качестве своего ввода. Таким образом, вы можете эффективно вызвать интерполяционную функцию многократно при скорости C, подавая векторный ввод. То есть это путь, путь быстрее, чем запись цикла for, который вызывает интерполяционную функцию снова и снова на скалярном входе. Поэтому, когда мы вычисляем множество точек данных, которые мы отбрасываем, мы экономим еще больше времени, не создавая множество различных интерполяционных функций, которые мы едва используем.

old_way = interped_along_path = np.array([interp1d(altitudes, finaltemps.values[i, :])(loc) 
                                                      for i, loc in enumerate(location)])
# look ma, no for loops!
new_way = np.diagonal(interp1d(altitudes, finaltemps.values)(location)) 
# note, `location` is a vector!
abs(old_way - new_way).max()
#-> 0.0

и еще:

%%timeit
res = np.diagonal(interp1d(altitudes, finaltemps.values)(location))
#-> 100 loops, best of 3: 16.7 ms per loop

Таким образом, этот подход дает нам коэффициент 10 лучше! Может ли кто-нибудь лучше? Или предложить совершенно другой подход?