Подтвердить что ты не робот

Эквивалент asyncio.Queues с рабочими "потоками"

Я пытаюсь выяснить, как переносить поточную программу на использование asyncio. У меня есть много кода, который синхронизируется вокруг нескольких стандартных библиотек Queues, в основном так:

import queue, random, threading, time

q = queue.Queue()

def produce():
    while True:
        time.sleep(0.5 + random.random())  # sleep for .5 - 1.5 seconds
        q.put(random.random())

def consume():
    while True: 
        value = q.get(block=True)
        print("Consumed", value)

threading.Thread(target=produce).start()
threading.Thread(target=consume).start()

Один поток создает значения (возможно, пользовательский ввод), а другой поток что-то делает с ними. Дело в том, что эти потоки простаивают до тех пор, пока не появятся новые данные, после чего они просыпаются и что-то делают с ним.

Я пытаюсь реализовать этот шаблон с помощью asyncio, но я не могу понять, как заставить его "идти".

Мои попытки выглядят более или менее подобными (и вообще ничего не делают).

import asyncio, random

q = asyncio.Queue()

@asyncio.coroutine
def produce():
    while True: 
        q.put(random.random())
        yield from asyncio.sleep(0.5 + random.random())

@asyncio.coroutine
def consume():
    while True:
        value = yield from q.get()
        print("Consumed", value)

# do something here to start the coroutines. asyncio.Task()? 

loop = asyncio.get_event_loop()
loop.run_forever()

Я пробовал варианты использования сопрограмм, не используя их, обертывая вещи в Задачи, пытаясь заставить их создавать или возвращать фьючерсы и т.д.

Я начинаю думать, что имею неправильное представление о том, как я должен использовать asyncio (возможно, этот шаблон должен быть реализован по-другому, что я не знаю). Любые указатели будут оценены.

4b9b3361

Ответ 1

Да, точно. Задачи - ваши друзья:

import asyncio, random

q = asyncio.Queue()

@asyncio.coroutine
def produce():
    while True:
        yield from q.put(random.random())
        yield from asyncio.sleep(0.5 + random.random())

@asyncio.coroutine
def consume():
    while True:
        value = yield from q.get()
        print("Consumed", value)


loop = asyncio.get_event_loop()
loop.create_task(produce())
loop.create_task(consume())
loop.run_forever()

asyncio.ensure_future также можно использовать для создания задачи.

И имейте в виду: q.put() является сопрограммой, поэтому вы должны использовать yield from q.put(value).

UPD

Переключено с asyncio.Task()/asyncio.async() на новый бренд API loop.create_task() и asyncio.ensure_future() в примере.

Ответ 3

Немного позже и, возможно, OT, имейте в виду, что вы можете потреблять из Queue из нескольких задач, поскольку они были независимыми потребителями.

Следующий фрагмент показывает в качестве примера, как вы можете получить один и тот же шаблон пула потоков с задачами asyncio.

q = asyncio.Queue()

async def sum(x):
    await asyncio.sleep(0.1)  # simulates asynchronously
    return x

async def consumer(i):
    print("Consumer {} started".format(i))
    while True:
        f, x = await q.get()
        print("Consumer {} procesing {}".format(i, x))
        r = await sum(x)
        f.set_result(r)

async def producer():
    consumers = [asyncio.ensure_future(consumer(i)) for i in range(5)]
    loop = asyncio.get_event_loop()
    tasks = [(asyncio.Future(), x) for x in range(10)]
    for task in tasks:
        await q.put(task)

    # wait until all futures are completed
    results = await asyncio.gather(*[f for f, _ in tasks])
    assert results == [r for _, r in tasks]

    # destroy tasks
    for c in consumers:
        c.cancel()


asyncio.get_event_loop().run_until_complete(producer())