Подтвердить что ты не робот

Python - как запускать несколько сопрограмм одновременно с использованием asyncio?

Я использую библиотеку websockets для создания сервера websocket в Python 3.4. Здесь простой сервер эха:

import asyncio
import websockets

@asyncio.coroutine
def connection_handler(websocket, path):
    while True:
        msg = yield from websocket.recv()
        if msg is None:  # connection lost
            break
        yield from websocket.send(msg)

start_server = websockets.serve(connection_handler, 'localhost', 8000)
asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()

Скажем, мы - дополнительно - хотели отправить сообщение клиенту всякий раз, когда происходит какое-то событие. Для простоты отправьте сообщение каждые 60 секунд. Как мы это сделаем? Я имею в виду, потому что connection_handler постоянно ждет входящих сообщений, сервер может действовать только после того, как он получил сообщение от клиента, верно? Что мне здесь не хватает?

Возможно, для этого сценария необходима структура, основанная на событиях/обратных вызовах, а не на основе сопрограмм? Tornado?

4b9b3361

Ответ 1

TL; DR Используйте asyncio.ensure_future() для запуска нескольких сопрограмм одновременно.


Возможно, для этого сценария необходима структура, основанная на событиях/обратных вызовах, а не на основе сопрограмм? Торнадо?

Нет, для этого вам не нужны никакие другие рамки. Вся идея асинхронного приложения против синхронного заключается в том, что он не блокирует, ожидая результата. Неважно, как это реализовано, используя сопрограммы или обратные вызовы.

Я имею в виду, потому что connection_handler постоянно ждет входящих сообщений, сервер может действовать только после того, как он получил сообщение от клиента, правильно? Что мне здесь не хватает?

В синхронном приложении вы напишете что-то вроде msg = websocket.recv(), которое блокирует все приложение, пока вы не получите сообщение (как описано). Но в асинхронном приложении это совершенно другое.

Когда вы делаете msg = yield from websocket.recv(), вы говорите что-то вроде: приостановите выполнение connection_handler(), пока websocket.recv() ничего не произведет. Использование yield from внутри coroutine возвращает управление обратно в цикл событий, поэтому можно выполнить другой код, пока мы ожидаем результата websocket.recv(). Пожалуйста, обратитесь к документации, чтобы лучше понять, как работают сопрограммы.

Скажем, мы - дополнительно - хотели отправить сообщение клиенту всякий раз, когда происходит какое-то событие. Для простоты отправьте сообщение каждые 60 секунд. Как мы это сделаем?

Вы можете использовать asyncio.async() для запуска как можно большего количества сопрограмм, прежде чем выполнять блокирующий вызов для цикла запуска.

import asyncio

import websockets

# here we'll store all active connections to use for sending periodic messages
connections = []


@asyncio.coroutine
def connection_handler(connection, path):
    connections.append(connection)  # add connection to pool
    while True:
        msg = yield from connection.recv()
        if msg is None:  # connection lost
            connections.remove(connection)  # remove connection from pool, when client disconnects
            break
        else:
            print('< {}'.format(msg))
        yield from connection.send(msg)
        print('> {}'.format(msg))


@asyncio.coroutine
def send_periodically():
    while True:
        yield from asyncio.sleep(5)  # switch to other code and continue execution in 5 seconds
        for connection in connections:
            print('> Periodic event happened.')
            yield from connection.send('Periodic event happened.')  # send message to each connected client


start_server = websockets.serve(connection_handler, 'localhost', 8000)
asyncio.get_event_loop().run_until_complete(start_server)
asyncio.async(send_periodically())  # before blocking call we schedule our coroutine for sending periodic messages
asyncio.get_event_loop().run_forever()

Вот пример реализации клиента. Он просит вас ввести имя, получить его обратно с эхо-сервера, ждет еще два сообщения с сервера (которые являются нашими периодическими сообщениями) и закрывает соединение.

import asyncio

import websockets


@asyncio.coroutine
def hello():
    connection = yield from websockets.connect('ws://localhost:8000/')
    name = input("What your name? ")
    yield from connection.send(name)
    print("> {}".format(name))
    for _ in range(3):
        msg = yield from connection.recv()
        print("< {}".format(msg))

    yield from connection.close()


asyncio.get_event_loop().run_until_complete(hello())

Важные моменты:

Ответ 2

Такая же проблема вряд ли может получить решение, пока я не увижу идеальный образец здесь: http://websockets.readthedocs.io/en/stable/intro.html#both

 done, pending = await asyncio.wait(
        [listener_task, producer_task],
        return_when=asyncio.FIRST_COMPLETED)  # Important

Итак, я могу обрабатывать многопроцессорные задачи, такие как heartbeat и redis subscribe.

Ответ 3

Я удивлен, что gather не упоминается.

Из документации по Python:

import asyncio

async def factorial(name, number):
    f = 1
    for i in range(2, number + 1):
        print(f"Task {name}: Compute factorial({i})...")
        await asyncio.sleep(1)
        f *= i
    print(f"Task {name}: factorial({number}) = {f}")

async def main():
    # Schedule three calls *concurrently*:
    await asyncio.gather(
        factorial("A", 2),
        factorial("B", 3),
        factorial("C", 4),
    )

asyncio.run(main())

# Expected output:
#
#     Task A: Compute factorial(2)...
#     Task B: Compute factorial(2)...
#     Task C: Compute factorial(2)...
#     Task A: factorial(2) = 2
#     Task B: Compute factorial(3)...
#     Task C: Compute factorial(3)...
#     Task B: factorial(3) = 6
#     Task C: Compute factorial(4)...
#     Task C: factorial(4) = 24