Подтвердить что ты не робот

Как правильно создавать и запускать параллельные задачи с помощью модуля python asyncio?

Я пытаюсь правильно понять и реализовать два одновременно работающих Task объектов, используя Python 3 относительно новый asyncio.

В двух словах asyncio, похоже, предназначен для обработки асинхронных процессов и параллельного выполнения Task по циклу событий. Это способствует использованию await (применяется в асинхронных функциях) как обратный вызов для ожидания и использования результата без блокировки цикла события. (Фьючерсы и обратные вызовы по-прежнему являются жизнеспособной альтернативой.)

Он также предоставляет класс asyncio.Task(), специализированный подкласс Future, предназначенный для переноса сопрограмм. Предпочтительно вызывается с помощью метода asyncio.ensure_future(). Предполагаемое использование задач asyncio - позволить независимо запущенным задачам запускаться "одновременно" с другими задачами в пределах одного цикла событий. Я понимаю, что Tasks связаны с циклом события, который затем автоматически продолжает управлять сопрограммой между операторами await.

Мне нравится идея использовать одновременные Задачи без необходимости использовать один из классов Executor, но у меня нет нашел много проработки в отношении реализации.

Вот как я это делаю сейчас:

import asyncio

print('running async test')

async def say_boo():
    i = 0
    while True:
        await asyncio.sleep(0)
        print('...boo {0}'.format(i))
        i += 1

async def say_baa():
    i = 0
    while True:
        await asyncio.sleep(0)
        print('...baa {0}'.format(i))
        i += 1

# OPTION 1: wrap in Task object
# -> automatically attaches to event loop and executes
boo = asyncio.ensure_future(say_boo())
baa = asyncio.ensure_future(say_baa())

loop = asyncio.get_event_loop()
loop.run_forever()

В случае попытки одновременного запуска двух задач цикла, я заметил, что, если у задачи нет внутреннего выражения await, он застрянет в цикле while, эффективно блокируя выполнение других задач (много как обычный цикл while). Однако, как только Задачи должны (а) ждать, они, кажется, запускаются одновременно без проблем.

Таким образом, операторы await, как представляется, обеспечивают цикл событий с плацдармом для переключения между задачами, давая эффект concurrency.

Пример вывода с внутренним await:

running async test
...boo 0
...baa 0
...boo 1
...baa 1
...boo 2
...baa 2

Пример вывода без внутреннего await:

...boo 0
...boo 1
...boo 2
...boo 3
...boo 4

Вопросы

Проходит ли эта реализация для "правильного" примера параллельных циклов Задачи в asyncio?

Правильно ли, что единственный способ, которым это работает, - это Task предоставить точку блокировки (выражение await), чтобы цикл события мог манипулировать несколькими задачами?

4b9b3361

Ответ 1

Да, любая сопрограмма, запущенная внутри цикла событий, блокирует запуск других сопрограмм и задач, если только

  • Вызывает другую сопрограмму с помощью yield from или await (если используется Python 3.5 +).
  • Возвращает.

Это потому, что asyncio является однопоточным; единственный способ запуска цикла события - не продолжать активное выполнение другой сопрограммы. Использование yield from/await временно приостанавливает сопрограмму, предоставляя циклу событий возможность работать.

Ваш примерный код в порядке, но во многих случаях вам, вероятно, не нужен долговременный код, который не выполняет асинхронный ввод-вывод, запущенный внутри цикла событий. В таких случаях часто бывает полезно использовать BaseEventLoop.run_in_executor для запуска кода в фоновом потоке или процессе. ProcessPoolExecutor будет лучшим выбором, если ваша задача связана с CPU, ThreadPoolExecutor будет использоваться, если вам нужно сделать некоторые операции ввода-вывода, которые не являются asyncio -другими.

Ваши две петли, например, полностью привязаны к процессору и не разделяют какое-либо состояние, поэтому наилучшая производительность будет получена при использовании ProcessPoolExecutor для параллельного запуска каждого цикла между CPU:

import asyncio
from concurrent.futures import ProcessPoolExecutor

print('running async test')

def say_boo():
    i = 0
    while True:
        print('...boo {0}'.format(i))
        i += 1


def say_baa():
    i = 0
    while True:
        print('...baa {0}'.format(i))
        i += 1

if __name__ == "__main__":
    executor = ProcessPoolExecutor(2)
    loop = asyncio.get_event_loop()
    boo = asyncio.ensure_future(loop.run_in_executor(executor, say_boo))
    baa = asyncio.ensure_future(loop.run_in_executor(executor, say_baa))

    loop.run_forever()

Ответ 2

Вам необязательно использовать yield from x для управления контуром событий.

В вашем примере я думаю, что правильным способом было бы сделать yield None или эквивалентно простой yield, а не yield from asyncio.sleep(0.001):

import asyncio

@asyncio.coroutine
def say_boo():
  i = 0
  while True:
    yield None
    print("...boo {0}".format(i))
    i += 1

@asyncio.coroutine
def say_baa():
  i = 0
  while True:
    yield
    print("...baa {0}".format(i))
    i += 1

boo_task = asyncio.async(say_boo())
baa_task = asyncio.async(say_baa())

loop = asyncio.get_event_loop()
loop.run_forever()

Coroutines - это просто старые генераторы Python. Внутренний цикл событий asyncio хранит запись этих генераторов и вызывает gen.send() по каждому из них по одному в бесконечном цикле. Всякий раз, когда вы yield, вызов gen.send() завершается, и цикл может двигаться дальше. (Я упрощаю это, огляните https://hg.python.org/cpython/file/3.4/Lib/asyncio/tasks.py#l265 для фактического кода)

Тем не менее, я по-прежнему буду идти по маршруту run_in_executor, если вам нужно делать вычисления с интенсивным вычислением без обмена данными.