Подтвердить что ты не робот

Как комбинировать python asyncio с потоками?

Я успешно построил RESTful microService с Python asyncio и aiohttp, который слушает событие POST для сбора событий в реальном времени от разных фидеров.

Затем он создает структуру в памяти для кэширования последних 24-х событий в вложенной структуре defaultdict/deque.

Теперь я хотел бы периодически отмечать эту структуру на диске, предпочтительно используя pickle.

Так как структура памяти может быть > 100 МБ, я бы хотел избежать поддержки обработки входящих событий на время, необходимое для проверки структуры.

Я бы предпочел создать копию моментального снимка (например, глубину) структуры, а затем потратить время, чтобы записать ее на диск и повторить в заданный временной интервал.

Я искал примеры того, как объединить потоки (и это нить, даже лучшее решение для этого?) и asyncio для этой цели, но не смог найти что-то, что могло бы помочь мне.

Любые указатели для начала очень ценятся!

4b9b3361

Ответ 1

Довольно просто делегировать метод потоку или подпроцессу с помощью BaseEventLoop.run_in_executor:

import asyncio
import time
from concurrent.futures import ProcessPoolExecutor

def cpu_bound_operation(x):
    time.sleep(x) # This is some operation that is CPU-bound

@asyncio.coroutine
def main():
    # Run cpu_bound_operation in the ProcessPoolExecutor
    # This will make your coroutine block, but won't block
    # the event loop; other coroutines can run in meantime.
    yield from loop.run_in_executor(p, cpu_bound_operation, 5)


loop = asyncio.get_event_loop()
p = ProcessPoolExecutor(2) # Create a ProcessPool with 2 processes
loop.run_until_complete(main())

Что касается использования ProcessPoolExecutor или ThreadPoolExecutor, такого рода трудно сказать; травление большого объекта, безусловно, будет потреблять некоторые циклы процессора, из которых вы бы подумали, что ProcessPoolExecutor - это путь. Однако передача вашего объекта в 100 Мбайт в Process в пуле потребует травления экземпляра в вашем основном процессе, отправки байтов в дочерний процесс через IPC, рассыпания его в дочернем элементе и последующего травления его, чтобы вы могли его написать на диск. Учитывая это, я предполагаю, что накладные расходы на раскачку/распиловку будут достаточно большими, чтобы вам было лучше использовать ThreadPoolExecutor, даже если вы собираетесь получить удар производительности из-за GIL.

Тем не менее, очень просто проверить оба пути и узнать наверняка, чтобы вы могли это сделать.