Подтвердить что ты не робот

Выберите первый результат из двух сопрограмм в asyncio

Вопрос

Используя модуль Python asyncio, как выбрать первый результат из нескольких сопрограмм?

Пример

Мне может понадобиться реализовать тайм-аут при ожидании очереди:

result = yield from select(asyncio.sleep(1),
                           queue.get())

Аналоговые операции

Это будет похоже на Go select или Clojure core.async.alt!. Это что-то вроде обращения asyncio.gather (сборка похожа на all, select будет как any.)

4b9b3361

Ответ 1

Вы можете реализовать это, используя как asyncio.wait, так и asyncio.as_completed:

import asyncio

@asyncio.coroutine
def ok():
    yield from asyncio.sleep(1)
    return 5

@asyncio.coroutine
def select1(*futures, loop=None):
    if loop is None:
        loop = asyncio.get_event_loop()
    return (yield from next(asyncio.as_completed(futures)))

@asyncio.coroutine
def select2(*futures, loop=None):
    if loop is None:
        loop = asyncio.get_event_loop()
    done, running = yield from asyncio.wait(futures,
                                            return_when=asyncio.FIRST_COMPLETED)
    result = done.pop()
    return result.result()

@asyncio.coroutine
def example():
    queue = asyncio.Queue()
    result = yield from select1(ok(), queue.get())
    print('got {}'.format(result))
    result = yield from select2(queue.get(), ok())
    print('got {}'.format(result))

if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(example())

Вывод:

got 5
got 5
Task was destroyed but it is pending!
task: <Task pending coro=<get() done, defined at /usr/lib/python3.4/asyncio/queues.py:170> wait_for=<Future pending cb=[Task._wakeup()]> cb=[as_completed.<locals>._on_completion() at /usr/lib/python3.4/asyncio/tasks.py:463]>
Task was destroyed but it is pending!
task: <Task pending coro=<get() done, defined at /usr/lib/python3.4/asyncio/queues.py:170> wait_for=<Future pending cb=[Task._wakeup()]>>

Обе реализации возвращают значение, полученное первым завершенным Future, но вы можете легко настроить его, чтобы вместо этого вернуть Future. Обратите внимание, что, поскольку другой Future, переданный каждой реализации select, никогда не уступает, предупреждение возникает, когда процесс завершается.

Ответ 2

Простое решение, используя asyncio.wait и его параметр FIRST_COMPLETED:

import asyncio

async def something_to_wait():
    await asyncio.sleep(1)
    return "something_to_wait"

async def something_else_to_wait():
    await asyncio.sleep(2)
    return "something_else_to_wait"


async def wait_first():
    done, pending = await asyncio.wait(
        [something_to_wait(), something_else_to_wait()],
        return_when=asyncio.FIRST_COMPLETED)
    print("done", done)
    print("pending", pending)

asyncio.get_event_loop().run_until_complete(wait_first())

дает:

done {<Task finished coro=<something_to_wait() done, defined at stack.py:3> result='something_to_wait'>}
pending {<Task pending coro=<something_else_to_wait() running at stack.py:8> wait_for=<Future pending cb=[Task._wakeup()]>>}
Task was destroyed but it is pending!
task: <Task pending coro=<something_else_to_wait() running at stack.py:8> wait_for=<Future pending cb=[Task._wakeup()]>>

Ответ 3

В случае необходимости применения тайм-аута к задаче существует стандартная функция библиотеки, которая выполняет именно это: asyncio.wait_for(), Ваш пример можно записать следующим образом:

try:
  result = await asyncio.wait_for(queue.get(), timeout=1)
except asyncio.TimeoutError:
  # This block will execute if queue.get() takes more than 1s.
  result = ...

Но это работает только для конкретного случая таймаута. Остальные два ответа здесь обобщают на любой произвольный набор задач, но ни один из этих ответов не показывает, как очистить задачи, которые не заканчиваются в первую очередь. Это приводит к тому, что на выходе выводятся сообщения "Задача была уничтожена, но она ожидает". На практике вы должны сделать что-то с этими ожидающими задачами. Основываясь на вашем примере, я предполагаю, что вам не нужны результаты других задач. Здесь приведен пример функции wait_first(), которая возвращает значение первой завершенной задачи и отменяет оставшиеся задачи.

import asyncio, random

async def foo(x):
    r = random.random()
    print('foo({:d}) sleeping for {:0.3f}'.format(x, r))
    await asyncio.sleep(r)
    print('foo({:d}) done'.format(x))
    return x

async def wait_first(*futures):
    ''' Return the result of the first future to finish. Cancel the remaining
    futures. '''
    done, pending = await asyncio.wait(futures,
        return_when=asyncio.FIRST_COMPLETED)
    gather = asyncio.gather(*pending)
    gather.cancel()
    try:
        await gather
    except asyncio.CancelledError:
        pass
    return done.pop().result()

async def main():
    result = await wait_first(foo(1), foo(2))
    print('the result is {}'.format(result))

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())
    loop.close()

Запуск этого примера:

# export PYTHONASYNCIODEBUG=1
# python3 test.py
foo(1) sleeping for 0.381
foo(2) sleeping for 0.279
foo(2) done
the result is 2
# python3 test.py
foo(1) sleeping for 0.048
foo(2) sleeping for 0.515
foo(1) done
the result is 1
# python3 test.py
foo(1) sleeping for 0.396
foo(2) sleeping for 0.188
foo(2) done
the result is 2

Нет сообщений об ошибках в ожидающих задачах, так как каждая ожидающая задача была очищена правильно.

На практике вы, вероятно, хотите, чтобы wait_first() возвращал будущее, а не будущий результат, иначе это будет действительно запутывать, пытаясь выяснить, какое будущее закончилось. Но в этом примере я вернул будущий результат, так как он выглядит немного чище.