Подтвердить что ты не робот

Как я могу периодически выполнять функцию с asyncio?

Я перехожу от tornado до asyncio, и я не могу найти эквивалент asyncio tornado PeriodicCallback. (A PeriodicCallback принимает два аргумента: функция для запуска и количество миллисекунд между вызовами.)

  • Есть ли такой эквивалент в asyncio?
  • Если нет, то какой будет самый чистый способ реализовать это, не рискуя получить RecursionError через некоторое время?
4b9b3361

Ответ 1

import asyncio

@asyncio.coroutine
def periodic():
    while True:
        print('periodic')
        yield from asyncio.sleep(1)

def stop():
    task.cancel()

task = asyncio.Task(periodic())
loop = asyncio.get_event_loop()
loop.call_later(5, stop)

try:
    loop.run_until_complete(task)
except asyncio.CancelledError:
    pass

Ответ 2

Когда вы чувствуете, что что-то должно происходить "в фоновом режиме" вашей асинхронной программы, asyncio.Task может быть хорошим способом сделать это. Вы можете прочитать этот пост, чтобы посмотреть, как работать с задачами.

Здесь возможна реализация класса, периодически выполняющего некоторую функцию:

import asyncio
from contextlib import suppress


class Periodic:
    def __init__(self, func, time):
        self.func = func
        self.time = time
        self.is_started = False
        self._task = None

    async def start(self):
        if not self.is_started:
            self.is_started = True
            # Start task to call func periodically:
            self._task = asyncio.ensure_future(self._run())

    async def stop(self):
        if self.is_started:
            self.is_started = False
            # Stop task and await it stopped:
            self._task.cancel()
            with suppress(asyncio.CancelledError):
                await self._task

    async def _run(self):
        while True:
            await asyncio.sleep(self.time)
            self.func()

Протестируйте его:

async def main():
    p = Periodic(lambda: print('test'), 1)
    try:
        print('Start')
        await p.start()
        await asyncio.sleep(3.1)

        print('Stop')
        await p.stop()
        await asyncio.sleep(3.1)

        print('Start')
        await p.start()
        await asyncio.sleep(3.1)
    finally:
        await p.stop()  # we should stop task finally


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

Вывод:

Start
test
test
test

Stop

Start
test
test
test

[Finished in 9.5s]

Как вы видите на start, мы просто запускаем задачу, которая вызывает некоторые функции и некоторое время спит в бесконечном цикле. На stop мы просто отменим эту задачу. Обратите внимание: эта задача должна быть остановлена ​​в момент завершения программы.

Еще одна важная вещь, что ваш обратный вызов не должен занимать много времени, чтобы быть выполненным (или он заморозит цикл событий). Если вы планируете вызывать несколько длительных func, вам может понадобиться чтобы запустить его в исполнителе.

Ответ 3

Нет встроенной поддержки периодических вызовов, нет.

Просто создайте свой собственный цикл планировщика, который засыпает и выполняет любые запланированные задачи:

import math, time

async def scheduler():
    while True:
        # sleep until the next whole second
        now = time.time()
        await asyncio.sleep(math.ceil(now) - now)

        # execute any scheduled tasks
        await for task in scheduled_tasks(time.time()):
            await task()

Итератор scheduled_tasks() должен создавать задачи, которые готовы к запуску в данный момент времени. Обратите внимание, что создание графика и отведение всех задач теоретически может занимать больше 1 секунды; идея заключается в том, что планировщик дает все задачи, которые должны были начаться с момента последней проверки.

Ответ 4

Основываясь на @A. Ответ Джесси Жири Дэвис (с комментариями @Torkel Bjørnson-Langen и @ReWrite) это улучшение, которое позволяет избежать дрейфа.

import time
import asyncio

@asyncio.coroutine
def periodic(period):
    def g_tick():
        t = time.time()
        count = 0
        while True:
            count += 1
            yield max(t + count * period - time.time(), 0)
    g = g_tick()

    while True:
        print('periodic', time.time())
        yield from asyncio.sleep(next(g))

loop = asyncio.get_event_loop()
task = loop.create_task(periodic(1))
loop.call_later(5, task.cancel)

try:
    loop.run_until_complete(task)
except asyncio.CancelledError:
    pass