Подтвердить что ты не робот

Как эффективно выполнять многие задачи "немного позже" на Python?

У меня есть процесс, которому нужно выполнить кучу действий "позже" (обычно через 10-60 секунд). Проблема в том, что эти "более поздние" действия могут быть много (1000), поэтому использование Thread для каждой задачи нецелесообразно. Я знаю, что существуют такие инструменты, как gevent и eventlet, но одна из проблем заключается в том, что процесс использует zeromq для связи, поэтому мне понадобится некоторая интеграция (у eventlet уже есть).

Что мне интересно, каковы мои варианты? Итак, предложения приветствуются в строках библиотек (если вы использовали любой из упомянутых, пожалуйста, поделитесь своими впечатлениями), методы (поддержка Python "coroutine" , используйте один поток, который некоторое время спит и проверяет очередь), как использовать опрос zeromq или eventloop для выполнения задания или что-то еще.

4b9b3361

Ответ 1

рассмотрите возможность использования очереди приоритетов с одним или несколькими рабочими потоками для обслуживания задач. Основной поток может добавить работу в очередь, с меткой времени, которую он должен обслуживать. Потребители рабочего потока работают с очереди, спят до достижения значения приоритета, выполняют работу, а затем выталкивают другой объект из очереди.

Как насчет более сложного ответа. mklauber делает хороший момент. Если есть вероятность, что все ваши работники могут спать, когда у вас будет новая, более срочная работа, то queue.PriorityQueue на самом деле не является решением, хотя "приоритетная очередь" по-прежнему является используемой техникой, доступной из heapq. Вместо этого мы будем использовать другой примитив синхронизации; переменная условия, которая в питоне пишется threading.Condition.

Подход довольно прост, заглядывает в кучу, и если работа текущая, вытащите ее и выполните эту работу. Если бы была работа, но она планировалась в будущем, просто дожидайтесь состояния до тех пор или если вообще не будет никакой работы, спать вечно.

Продюсер делает это на справедливую долю работы; каждый раз, когда он добавляет новую работу, он уведомляет об этом, поэтому, если есть спящие рабочие, они проснутся и перепроверят очередь для более новой работы.

import heapq, time, threading

START_TIME = time.time()
SERIALIZE_STDOUT = threading.Lock()
def consumer(message):
    """the actual work function.  nevermind the locks here, this just keeps
       the output nicely formatted.  a real work function probably won't need
       it, or might need quite different synchronization"""
    SERIALIZE_STDOUT.acquire()
    print time.time() - START_TIME, message
    SERIALIZE_STDOUT.release()

def produce(work_queue, condition, timeout, message):
    """called to put a single item onto the work queue."""
    prio = time.time() + float(timeout)
    condition.acquire()
    heapq.heappush(work_queue, (prio, message))
    condition.notify()
    condition.release()

def worker(work_queue, condition):
    condition.acquire()
    stopped = False
    while not stopped:
        now = time.time()
        if work_queue:
            prio, data = work_queue[0]
            if data == 'stop':
                stopped = True
                continue
            if prio < now:
                heapq.heappop(work_queue)
                condition.release()
                # do some work!
                consumer(data)
                condition.acquire()
            else:
                condition.wait(prio - now)
        else:
            # the queue is empty, wait until notified
            condition.wait()
    condition.release()

if __name__ == '__main__':
    # first set up the work queue and worker pool
    work_queue = []
    cond = threading.Condition()
    pool = [threading.Thread(target=worker, args=(work_queue, cond))
            for _ignored in range(4)]
    map(threading.Thread.start, pool)

    # now add some work
    produce(work_queue, cond, 10, 'Grumpy')
    produce(work_queue, cond, 10, 'Sneezy')
    produce(work_queue, cond, 5, 'Happy')
    produce(work_queue, cond, 10, 'Dopey')
    produce(work_queue, cond, 15, 'Bashful')
    time.sleep(5)
    produce(work_queue, cond, 5, 'Sleepy')
    produce(work_queue, cond, 10, 'Doc')

    # and just to make the example a bit more friendly, tell the threads to stop after all
    # the work is done
    produce(work_queue, cond, float('inf'), 'stop')
    map(threading.Thread.join, pool)

Ответ 2

В этом ответе есть два предложения - мой первый и другой, который я обнаружил после первого.

Плановое

Я подозреваю, что вы ищете модуль sched.

РЕДАКТИРОВАТЬ: мое голое предложение оказалось мало полезным после того, как я его прочитал. Поэтому я решил протестировать модуль sched, чтобы узнать, может ли он работать, как я предложил. Вот мой тест: я бы использовал его с единственной нитью, более или менее таким образом:

class SchedulingThread(threading.Thread):

    def __init__(self):
        threading.Thread.__init__(self)
        self.scheduler = sched.scheduler(time.time, time.sleep)
        self.queue = []
        self.queue_lock = threading.Lock()
        self.scheduler.enter(1, 1, self._schedule_in_scheduler, ())

    def run(self):
        self.scheduler.run()

    def schedule(self, function, delay):
        with self.queue_lock:
            self.queue.append((delay, 1, function, ()))

    def _schedule_in_scheduler(self):
        with self.queue_lock:
            for event in self.queue:
                self.scheduler.enter(*event)
                print "Registerd event", event
            self.queue = []
        self.scheduler.enter(1, 1, self._schedule_in_scheduler, ())

Во-первых, я бы создал класс потоков, у которого был бы собственный планировщик и очередь. В планировщике будет зарегистрировано как минимум одно событие: одно для вызова метода планирования событий из очереди.

class SchedulingThread(threading.Thread):
    def __init__(self):
        threading.Thread.__init__(self)
        self.scheduler = sched.scheduler(time.time, time.sleep)
        self.queue = []
        self.queue_lock = threading.Lock()
        self.scheduler.enter(1, 1, self._schedule_in_scheduler, ())

Метод планирования событий из очереди блокирует очередь, расписывает каждое событие, пустет очередь и сам расписывается, для поиска новых событий в будущем. Обратите внимание, что период поиска новых событий короткий (одна секунда), вы можете его изменить:

    def _schedule_in_scheduler(self):
        with self.queue_lock:
            for event in self.queue:
                self.scheduler.enter(*event)
                print "Registerd event", event
            self.queue = []
        self.scheduler.enter(1, 1, self._schedule_in_scheduler, ())

Класс также должен иметь метод планирования пользовательских событий. Естественно, этот метод должен блокировать очередь при обновлении:

    def schedule(self, function, delay):
        with self.queue_lock:
            self.queue.append((delay, 1, function, ()))

Наконец, класс должен вызывать основной метод планировщика:

    def run(self):
        self.scheduler.run()

Вот пример использования:

def print_time():
    print "scheduled:", time.time()


if __name__ == "__main__":
    st = SchedulingThread()
    st.start()          
    st.schedule(print_time, 10)

    while True:
        print "main thread:", time.time()
        time.sleep(5)

    st.join()

Его вывод в моей машине:

$ python schedthread.py
main thread: 1311089765.77
Registerd event (10, 1, <function print_time at 0x2f4bb0>, ())
main thread: 1311089770.77
main thread: 1311089775.77
scheduled: 1311089776.77
main thread: 1311089780.77
main thread: 1311089785.77

Этот код - просто быстрый пример, может потребоваться некоторая работа. Тем не менее, я должен признаться, что я немного очарован модулем sched, поэтому я предложил его. Вы можете также искать другие предложения:)

APScheduler

Глядя в Google на решения, подобные тем, которые я публикую, я нашел этот удивительный модуль APScheduler. Это настолько практично и полезно, что я уверен, что это ваше решение. Мой предыдущий пример был бы проще с этим модулем:

from apscheduler.scheduler import Scheduler
import time

sch = Scheduler()
sch.start()

@sch.interval_schedule(seconds=10)

def print_time():
    print "scheduled:", time.time()
    sch.unschedule_func(print_time)

while True:
    print "main thread:", time.time()
    time.sleep(5)

(К сожалению, я не нашел, как планировать событие для выполнения только один раз, поэтому событие функции должно отменить сам. Могу поспорить, это можно решить с помощью какого-то декоратора.)

Ответ 3

Если у вас есть куча задач, которые нужно выполнить позже, и вы хотите, чтобы они сохранялись, даже если вы закрыли вызывающую программу или ваших сотрудников, вы должны действительно изучить Celery, что позволяет легко создавать новые задачи, выполнять их на любом компьютере, который вы хотите, и ждать результатов.

Из страницы сельдерея: "Это простая задача, добавляющая два числа:"

from celery.task import task

@task
def add(x, y):
    return x + y

Вы можете выполнить задачу в фоновом режиме или дождаться ее завершения:

>>> result = add.delay(8, 8)
>>> result.wait() # wait for and return the result
16

Ответ 5

Вы писали:

одна из проблем заключается в том, что процесс использует zeromq для связи, поэтому мне потребуется некоторая интеграция (eventlet уже имеет его)

Похоже, ваш выбор будет сильно зависеть от этих деталей, которые немного неясны: как использовать zeromq для связи, сколько ресурсов потребует интеграция, и каковы ваши требования и доступные ресурсы.


Там есть проект django-ztask, который использует zeromq и предоставляет декодер task, подобный сельдерей один. Тем не менее, это (очевидно) Django-специфическое и, следовательно, может оказаться неприемлемым в вашем случае. Я не использовал его, сам сельдерей.

Используется сельдерей для нескольких проектов (они размещаются на веб-хосте ep.io PaaS, который предоставляет простой способ его использования).

Celery выглядит как довольно гибкое решение, позволяющее задерживать задачи, обратные вызовы, истечение срока действия задачи и повторную попытку, ограничение скорости выполнения задач и т.д. Его можно использовать с Redis, Beanstalk, CouchDB, MongoDB или SQL-базой данных.

Пример кода (определение задачи и асинхронное выполнение после задержки):

from celery.decorators import task

@task
def my_task(arg1, arg2):
    pass # Do something

result = my_task.apply_async(
    args=[sth1, sth2], # Arguments that will be passed to `my_task()` function.
    countdown=3, # Time in seconds to wait before queueing the task.
)

См. также раздел в документах celery.

Ответ 6

Вы посмотрели модуль multiprocessing? Он поставляется с Python. Он похож на модуль threading, но запускает каждую задачу в процессе. Вы можете использовать объект Pool() для настройки рабочего пула, а затем использовать метод .map() для вызова функции с различными аргументами заданной задачи.

Ответ 7

Pyzmq имеет реализацию ioloop с аналогичным api с тем, что торнадо ioloop. Он реализует DelayedCallback, который может вам помочь.

Ответ 8

Предполагая, что ваш процесс имеет цикл запуска, который может принимать сигналы, а продолжительность каждого действия находится в границах последовательной операции, использовать сигналы и тревогу posix()

    signal.alarm(time)
If time is non-zero, this function requests that a 
SIGALRM signal be sent to the process in time seconds. 

Это зависит от того, что вы подразумеваете под "теми" позже "действия могут быть много", и если ваш процесс уже использует сигналы. Из-за формулировки вопроса непонятно, зачем нужен внешний пакет python.

Ответ 9

Другой вариант - использовать привязки Phyton GLib, в частности его функции timeout.

Это хороший выбор, если вы не хотите использовать несколько ядер и до тех пор, пока зависимость от GLib не проблема. Он обрабатывает все события в одном потоке, что предотвращает проблемы синхронизации. Кроме того, его инфраструктура событий также может использоваться для просмотра и обработки событий на основе IO (т.е. Сокетов).

UPDATE:

Здесь живое сеанс с использованием GLib:

>>> import time
>>> import glib
>>> 
>>> def workon(thing):
...     print("%s: working on %s" % (time.time(), thing))
...     return True # use True for repetitive and False for one-time tasks
... 
>>> ml = glib.MainLoop()
>>> 
>>> glib.timeout_add(1000, workon, "this")
2
>>> glib.timeout_add(2000, workon, "that")
3
>>> 
>>> ml.run()
1311343177.61: working on this
1311343178.61: working on that
1311343178.61: working on this
1311343179.61: working on this
1311343180.61: working on this
1311343180.61: working on that
1311343181.61: working on this
1311343182.61: working on this
1311343182.61: working on that
1311343183.61: working on this

Ответ 10

Хорошо, на мой взгляд, вы могли бы использовать что-то, называемое "совместная многозадачность". Это завихренная вещь, и ее действительно здорово. Посмотрите презентацию PyCon с 2010 года: http://blip.tv/pycon-us-videos-2009-2010-2011/pycon-2010-cooperative-multitasking-with-twisted-getting-things-done-concurrently-11-3352182

Ну, вам понадобится транспортная очередь, чтобы сделать это тоже...

Ответ 11

Simple. Вы можете наследовать свой класс из Thread и создать экземпляр вашего класса с параметром time time, поэтому для каждого экземпляра вашего класса вы можете указать тайм-аут, который заставит ваш поток ждать за это время