Подтвердить что ты не робот

Запланировать повторное событие в Python 3

Я пытаюсь запланировать повторяющееся событие для запуска каждую минуту в Python 3.

Я видел класс sched.scheduler, но мне интересно, есть ли другой способ сделать это. Я слышал упоминания, что я мог бы использовать несколько потоков для этого, что я бы не прочь сделать.

Я в основном запрашиваю JSON, а затем разбираю его; его значение изменяется со временем.

Чтобы использовать sched.scheduler, мне нужно создать цикл, чтобы запросить его, чтобы запланировать четное выполнение в течение одного часа:

scheduler = sched.scheduler(time.time, time.sleep)

# Schedule the event. THIS IS UGLY!
for i in range(60):
    scheduler.enter(3600 * i, 1, query_rate_limit, ())

scheduler.run()

Какие еще способы сделать это?

4b9b3361

Ответ 1

Вы можете использовать threading.Timer, но также планируете одноразовое событие, аналогично методу .enter объектов планировщика.

Нормальный шаблон (на любом языке) для преобразования одноразового планировщика в периодический планировщик должен состоять в том, чтобы каждое событие перепланировало себя с заданным интервалом. Например, с sched, я бы не использовал цикл, как вы делаете, а что-то вроде:

def periodic(scheduler, interval, action, actionargs=()):
    scheduler.enter(interval, 1, periodic,
                    (scheduler, interval, action, actionargs))
    action(*actionargs)

и инициировать весь "вечный периодический график" с помощью вызова

periodic(scheduler, 3600, query_rate_limit)

Или, я мог бы использовать threading.Timer вместо scheduler.enter, но шаблон довольно схож.

Если вам нужна более четкая вариация (например, прекратите периодическую перепланировку в определенный момент времени или при определенных условиях), это не слишком сложно для размещения с несколькими дополнительными параметрами.

Ответ 2

Мой скромный подход к теме:

from threading import Timer

class RepeatedTimer(object):
    def __init__(self, interval, function, *args, **kwargs):
        self._timer     = None
        self.function   = function
        self.interval   = interval
        self.args       = args
        self.kwargs     = kwargs
        self.is_running = False
        self.start()

    def _run(self):
        self.is_running = False
        self.start()
        self.function(*self.args, **self.kwargs)

    def start(self):
        if not self.is_running:
            self._timer = Timer(self.interval, self._run)
            self._timer.start()
            self.is_running = True

    def stop(self):
        self._timer.cancel()
        self.is_running = False

Использование:

from time import sleep

def hello(name):
    print "Hello %s!" % name

print "starting..."
rt = RepeatedTimer(1, hello, "World") # it auto-starts, no need of rt.start()
try:
    sleep(5) # your long-running job goes here...
finally:
    rt.stop() # better in a try/finally block to make sure the program ends!

Особенности:

  • Стандартная библиотека, без внешних зависимостей
  • Использует шаблон, предложенный Алексом Мартнелли
  • start() и stop() безопасны для вызова несколько раз, даже если таймер уже запущен/остановлен
  • Функция, которая должна быть вызвана, может иметь позиционные и именованные аргументы
  • Вы можете изменить interval в любое время, это будет эффективно после следующего прогона. То же самое для args, kwargs и даже function!

Ответ 3

Вы можете использовать schedule. Он работает на Python 2.7 и 3.3 и довольно легкий:

import schedule
import time

def job():
   print("I'm working...")

schedule.every(10).minutes.do(job)
schedule.every().hour.do(job)
schedule.every().day.at("10:30").do(job)

while 1:
   schedule.run_pending()
   time.sleep(1)

Ответ 4

Вы можете использовать Advanced Python Scheduler. Он даже имеет cron-подобный интерфейс.

Ответ 5

Основываясь на ответе MestreLion, он решает небольшую проблему при многопоточности:

from threading import Timer, Lock


class Periodic(object):
    """
    A periodic task running in threading.Timers
    """

    def __init__(self, interval, function, *args, **kwargs):
        self._lock = Lock()
        self._timer = None
        self.function = function
        self.interval = interval
        self.args = args
        self.kwargs = kwargs
        self._stopped = True
        if kwargs.pop('autostart', True):
            self.start()

    def start(self, from_run=False):
        self._lock.acquire()
        if from_run or self._stopped:
            self._stopped = False
            self._timer = Timer(self.interval, self._run)
            self._timer.start()
            self._lock.release()

    def _run(self):
        self.start(from_run=True)
        self.function(*self.args, **self.kwargs)

    def stop(self):
        self._lock.acquire()
        self._stopped = True
        self._timer.cancel()
        self._lock.release()

Ответ 6

Используйте сельдерей.

from celery.task import PeriodicTask
from datetime import timedelta


class ProcessClicksTask(PeriodicTask):
    run_every = timedelta(minutes=30)

    def run(self, **kwargs):
        #do something

Ответ 7

Основываясь на ответе Алекса Мартелли, я внедрил версию декоратора, которая проще интегрировать.

import sched
import time
import datetime
from functools import wraps
from threading import Thread


def async(func):
    @wraps(func)
    def async_func(*args, **kwargs):
        func_hl = Thread(target=func, args=args, kwargs=kwargs)
        func_hl.start()
        return func_hl
    return async_func


def schedule(interval):
    def decorator(func):
        def periodic(scheduler, interval, action, actionargs=()):
            scheduler.enter(interval, 1, periodic,
                            (scheduler, interval, action, actionargs))
            action(*actionargs)

        @wraps(func)
        def wrap(*args, **kwargs):
            scheduler = sched.scheduler(time.time, time.sleep)
            periodic(scheduler, interval, func)
            scheduler.run()
        return wrap
    return decorator


@async
@schedule(1)
def periodic_event():
    print(datetime.datetime.now())


if __name__ == '__main__':
    print('start')
    periodic_event()
    print('end')

Ответ 8

Здесь быстрый и грязный неблокирующий цикл с Thread:

#!/usr/bin/env python3
import threading,time

def worker():
    print(time.time())
    time.sleep(5)
    t = threading.Thread(target=worker)
    t.start()


threads = []
t = threading.Thread(target=worker)
threads.append(t)
t.start()
time.sleep(7)
print("Hello World")

Там нет ничего особенного, worker создает новый поток сам с задержкой. Не может быть наиболее эффективным, но достаточно простым. Ответ Northtree был бы правильным, если вам нужно более сложное решение.

И исходя из этого, мы можем сделать то же самое, только с Timer:

#!/usr/bin/env python3
import threading,time

def hello():
    t = threading.Timer(10.0, hello)
    t.start()
    print( "hello, world",time.time() )

t = threading.Timer(10.0, hello)
t.start()
time.sleep(12)
print("Oh,hai",time.time())
time.sleep(4)
print("How it going?",time.time())

Ответ 9

См. Мой пример

import sched, time

def myTask(m,n):
  print n+' '+m

def periodic_queue(interval,func,args=(),priority=1):
  s = sched.scheduler(time.time, time.sleep)
  periodic_task(s,interval,func,args,priority)
  s.run()

def periodic_task(scheduler,interval,func,args,priority):
  func(*args)
  scheduler.enter(interval,priority,periodic_task,
                   (scheduler,interval,func,args,priority))

periodic_queue(1,myTask,('world','hello'))

Ответ 10

task-scheduler - это планировщик процесса, который периодически организует и запускает задачу в соответствии с конфигурационным файлом YAML. См. https://github.com/tzutalin/task-scheduler