Подтвердить что ты не робот

Какие проблемы (если они есть) были бы объединены asyncio с многопроцессорной обработкой?

Как почти все знают, когда они сначала смотрят на потоки в Python, есть GIL, который делает жизнь несчастной для людей, которые на самом деле хотят обрабатывать параллельно - или, по крайней мере, дать ей шанс.

В настоящее время я смотрю на реализацию чего-то вроде шаблона Reactor. Фактически я хочу слушать входящие подключения сокета по одному потоку, а когда кто-то пытается подключиться, принять это соединение и передать его другому потоку, как для обработки.

Я еще не уверен, какую нагрузку я могу столкнуться. Я знаю, что в настоящее время настроено ограничение на входящие сообщения на 2 МБ. Теоретически мы могли получать тысячи в секунду (хотя я не знаю, видели ли мы практически что-то подобное). Количество времени, затрачиваемого на обработку сообщения, не очень важно, хотя, очевидно, быстрее будет лучше.

Я изучал шаблон Reactor и разработал небольшой пример, используя библиотеку multiprocessing, которая (по крайней мере, в тестировании), кажется, работает нормально. Однако теперь/скоро у нас будет библиотека asyncio, которая будет обрабатывать цикл событий для меня.

Есть ли что-нибудь, что могло бы укусить меня, объединив asyncio и multiprocessing?

4b9b3361

Ответ 1

Вы можете быть в состоянии безопасно объединить asyncio и multiprocessing без особых проблем, хотя вы не должны использовать multiprocessing напрямую. Кардинальный грех asyncio (и любой другой асинхронной структуры на основе цикла событий) блокирует цикл события. Если вы попытаетесь использовать multiprocessing напрямую, в любое время, когда вы блокируете ожидание дочернего процесса, вы собираетесь заблокировать цикл событий. Очевидно, это плохо.

Самый простой способ избежать этого - использовать BaseEventLoop.run_in_executor для выполнения функции в concurrent.futures.ProcessPoolExecutor. ProcessPoolExecutor - это пул процессов, реализованный с использованием multiprocessing.Process, но asyncio имеет встроенную поддержку для выполнения в нем функции без блокировки цикла событий. Вот простой пример:

import time
import asyncio
from concurrent.futures import ProcessPoolExecutor

def blocking_func(x):
   time.sleep(x) # Pretend this is expensive calculations
   return x * 5

@asyncio.coroutine
def main():
    #pool = multiprocessing.Pool()
    #out = pool.apply(blocking_func, args=(10,)) # This blocks the event loop.
    executor = ProcessPoolExecutor()
    out = yield from loop.run_in_executor(executor, blocking_func, 10)  # This does not
    print(out)

if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

В большинстве случаев эта функция сама по себе достаточно хороша. Если вам нужны другие конструкции из multiprocessing, например Queue, Event, Manager и т.д., Существует сторонняя библиотека под названием aioprocessing (полное раскрытие: я написал его), который предоставляет asyncio -компонентные версии всех структур данных multiprocessing. Вот пример демонстрации того, что:

import time
import asyncio
import aioprocessing
import multiprocessing

def func(queue, event, lock, items):
    with lock:
        event.set()
        for item in items:
            time.sleep(3)
            queue.put(item+5)
    queue.close()

@asyncio.coroutine
def example(queue, event, lock):
    l = [1,2,3,4,5]
    p = aioprocessing.AioProcess(target=func, args=(queue, event, lock, l)) 
    p.start()
    while True:
        result = yield from queue.coro_get()
        if result is None:
            break
        print("Got result {}".format(result))
    yield from p.coro_join()

@asyncio.coroutine
def example2(queue, event, lock):
    yield from event.coro_wait()
    with (yield from lock):
        yield from queue.coro_put(78)
        yield from queue.coro_put(None) # Shut down the worker

if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    queue = aioprocessing.AioQueue()
    lock = aioprocessing.AioLock()
    event = aioprocessing.AioEvent()
    tasks = [ 
        asyncio.async(example(queue, event, lock)),
        asyncio.async(example2(queue, event, lock)),
    ]   
    loop.run_until_complete(asyncio.wait(tasks))
    loop.close()

Ответ 2

Да, есть довольно много бит, которые могут (или не могут) укусить вас.

  • Когда вы запускаете что-то вроде asyncio, он ожидает запустить один поток или процесс. Это само по себе не работает с параллельной обработкой. Вы каким-то образом должны распределять работу, оставляя операции ввода-вывода (в частности, на сокетах) в одном потоке/процессе.
  • В то время как ваша идея передать отдельные подключения к другому процессу обработчика хороша, его трудно реализовать. Первым препятствием является то, что вам нужно вытащить соединение из asyncio, не закрывая его. Следующим препятствием является то, что вы не можете просто отправить файловый дескриптор другому процессу, если вы не используете код платформы (возможно, Linux) из C-расширения.
  • Обратите внимание, что известно, что модуль multiprocessing создает несколько потоков для связи. Большую часть времени, когда вы используете коммуникационные структуры (например, Queue s), создается нить. К сожалению, эти потоки не полностью невидимы. Например, они могут не сработать чисто (когда вы намерены прекратить свою программу), но в зависимости от их количества использование ресурсов может быть заметным само по себе.

Если вы действительно собираетесь обрабатывать отдельные соединения в отдельных процессах, я предлагаю изучить различные подходы. Например, вы можете поместить сокет в режим прослушивания, а затем одновременно принимать соединения из нескольких рабочих процессов параллельно. Как только рабочий закончит обработку запроса, он может принять следующее соединение, поэтому вы все равно используете меньше ресурсов, чем форматирование процесса для каждого соединения. Spamassassin и Apache (mpm prefork) могут использовать эту рабочую модель, например. Это может оказаться проще и надежнее в зависимости от вашего варианта использования. В частности, вы можете заставить своих работников умереть после обслуживания настроенного количества запросов и быть вызванными мастер-процессом, тем самым устраняя большую часть негативных последствий утечек памяти.

Ответ 3

См. PEP 3156, в частности раздел о взаимодействии потоков:

http://www.python.org/dev/peps/pep-3156/#thread-interaction

В этом документе четко указаны новые методы асинхронного использования, в том числе run_in_executor(). Обратите внимание, что Executor определен в concurrent.futures, я предлагаю вам также посмотреть там.