Как обрабатывать поступающие данные в реальном времени с помощью python pandas - программирование
Подтвердить что ты не робот

Как обрабатывать поступающие данные в реальном времени с помощью python pandas

Каков наиболее рекомендуемый/питонический способ обработки живых входящих данных с помощью pandas?

Каждые несколько секунд я получаю точку данных в следующем формате:

{'time' :'2013-01-01 00:00:00', 'stock' : 'BLAH',
 'high' : 4.0, 'low' : 3.0, 'open' : 2.0, 'close' : 1.0}

Я хотел бы добавить его в существующий DataFrame, а затем запустить некоторый анализ.

Проблема в том, что просто добавление строк с помощью DataFrame.append может привести к проблемам с производительностью при этом копировании.

Что я пробовал:

Несколько человек предложили предварительно распределить большой DataFrame и обновить его, поскольку данные поступают:

In [1]: index = pd.DatetimeIndex(start='2013-01-01 00:00:00', freq='S', periods=5)

In [2]: columns = ['high', 'low', 'open', 'close']

In [3]: df = pd.DataFrame(index=t, columns=columns)

In [4]: df
Out[4]: 
                    high  low open close
2013-01-01 00:00:00  NaN  NaN  NaN   NaN
2013-01-01 00:00:01  NaN  NaN  NaN   NaN
2013-01-01 00:00:02  NaN  NaN  NaN   NaN
2013-01-01 00:00:03  NaN  NaN  NaN   NaN
2013-01-01 00:00:04  NaN  NaN  NaN   NaN

In [5]: data = {'time' :'2013-01-01 00:00:02', 'stock' : 'BLAH', 'high' : 4.0, 'low' : 3.0, 'open' : 2.0, 'close' : 1.0}

In [6]: data_ = pd.Series(data)

In [7]: df.loc[data['time']] = data_

In [8]: df
Out[8]: 
                    high  low open close
2013-01-01 00:00:00  NaN  NaN  NaN   NaN
2013-01-01 00:00:01  NaN  NaN  NaN   NaN
2013-01-01 00:00:02    4    3    2     1
2013-01-01 00:00:03  NaN  NaN  NaN   NaN
2013-01-01 00:00:04  NaN  NaN  NaN   NaN

Другая альтернатива - это создание списка dicts. Просто добавив входящие данные в список и разрезав его на более мелкие DataFrames, чтобы выполнить эту работу.

In [9]: ls = []

In [10]: for n in range(5):
   .....:     # Naive stuff ahead =)
   .....:     time = '2013-01-01 00:00:0' + str(n)
   .....:     d = {'time' : time, 'stock' : 'BLAH', 'high' : np.random.rand()*10, 'low' : np.random.rand()*10, 'open' : np.random.rand()*10, 'close' : np.random.rand()*10}
   .....:     ls.append(d)

In [11]: df = pd.DataFrame(ls[1:3]).set_index('time')

In [12]: df
Out[12]: 
                        close      high       low      open stock
time                                                             
2013-01-01 00:00:01  3.270078  1.008289  7.486118  2.180683  BLAH
2013-01-01 00:00:02  3.883586  2.215645  0.051799  2.310823  BLAH

или что-то в этом роде, возможно, обработка ввода немного больше.

4b9b3361

Ответ 1

На самом деле вы пытаетесь решить две проблемы: сбор данных в реальном времени и анализ этих данных. Первая проблема может быть решена с помощью Python logging, который предназначен для этой цели. Тогда другая проблема может быть решена путем чтения того же файла журнала.

Ответ 2

Я бы использовал HDF5/pytables следующим образом:

  • Храните данные как список python "как можно дольше".
  • Добавить результаты в этот список.
  • Когда он становится "большим":
    • нажмите на HDF5 Store, используя pandas io (и добавленную таблицу).
    • очистить список.
  • Повтор.

Фактически, функция, которую я определяю, использует список для каждого "ключа", чтобы вы могли хранить несколько DataFrames в хранилище HDF5 в том же процессе.


Мы определяем функцию, которую вы вызываете с каждой строкой d:

CACHE = {}
STORE = 'store.h5'   # Note: another option is to keep the actual file open

def process_row(d, key, max_len=5000, _cache=CACHE):
    """
    Append row d to the store 'key'.

    When the number of items in the key cache reaches max_len,
    append the list of rows to the HDF5 store and clear the list.

    """
    # keep the rows for each key separate.
    lst = _cache.setdefault(key, [])
    if len(lst) >= max_len:
        store_and_clear(lst, key)
    lst.append(d)

def store_and_clear(lst, key):
    """
    Convert key cache list to a DataFrame and append that to HDF5.
    """
    df = pd.DataFrame(lst)
    with pd.HDFStore(STORE) as store:
        store.append(key, df)
    lst.clear()

Примечание. Мы используем оператор with для автоматического закрытия хранилища после каждой записи. Он может быть быстрее, чтобы держать его открытым, но если это рекомендуется регулярно обновлять (закрытие флешей). Также обратите внимание, что более читаемо использовать коллекцию deque, а не список, но производительность списка будет немного лучше.

Чтобы использовать это, вы вызываете:

process_row({'time' :'2013-01-01 00:00:00', 'stock' : 'BLAH', 'high' : 4.0, 'low' : 3.0, 'open' : 2.0, 'close' : 1.0},
            key="df")

Примечание: "df" - это сохраненный key, используемый в хранилище pytables.

После завершения задания убедитесь, что вы store_and_clear остальной кеш:

for k, lst in CACHE.items():  # you can instead use .iteritems() in python 2
    store_and_clear(lst, k)

Теперь ваш полный DataFrame доступен через:

with pd.HDFStore(STORE) as store:
    df = store["df"]                    # other keys will be store[key]

Некоторые комментарии:

  • 5000 можно настроить, попробуйте с меньшими/большими номерами в соответствии с вашими потребностями.
  • Список append - O (1), DataFrame append - O (len(df)).
  • Пока вы не выполняете статистику или перетаскивание данных, вам не нужен pandas, используйте самый быстрый.
  • Этот код работает с несколькими ключами (точками данных).
  • Это очень маленький код, и мы остаемся в списке python vanilla, а затем pandas dataframe...

Кроме того, чтобы получить обновленные данные, вы можете определить метод get, который сохраняет и очищает перед чтением. Таким образом, вы получите самые последние данные:

def get_latest(key, _cache=CACHE):
    store_and_clear(_cache[key], key)
    with pd.HDFStore(STORE) as store:
        return store[key]

Теперь, когда вы получаете доступ:

df = get_latest("df")

вы получите доступную последнюю версию df.


Еще одна опция несколько более сложная: определите пользовательскую таблицу в ванильных pytables, см. tutorial.

Примечание. Для создания дескриптора столбца вам нужно знать имена полей.