Подтвердить что ты не робот

Цепи сельдерея не работают с партиями

На первый взгляд мне очень понравилась функция "Партии" в Celery, потому что мне нужно сгруппировать количество идентификаторов перед вызовом API (иначе меня могут выгнать).

К сожалению, при тестировании немного, пакетные задачи, похоже, не очень хорошо сочетаются с остальными примитивами Canvas, в данном случае цепей. Например:

@a.task(base=Batches, flush_every=10, flush_interval=5)
def get_price(requests):
    for request in requests:
        a.backend.mark_as_done(request.id, 42, request=request)
        print "filter_by_price " + str([r.args[0] for r in requests])

@a.task
def completed():
    print("complete")

Итак, с помощью этого простого рабочего процесса:

chain(get_price.s("ID_1"), completed.si()).delay()

Я вижу этот вывод:

[2015-07-11 16:16:20,348: INFO/MainProcess] Connected to redis://localhost:6379/0
[2015-07-11 16:16:20,376: INFO/MainProcess] mingle: searching for neighbors
[2015-07-11 16:16:21,406: INFO/MainProcess] mingle: all alone
[2015-07-11 16:16:21,449: WARNING/MainProcess] [email protected] ready.
[2015-07-11 16:16:34,093: WARNING/Worker-4] filter_by_price ['ID_1']

Через 5 секунд фильтр_by_price() запускается так же, как и ожидалось. Проблема в том, что завершенный() никогда не вызывается.

Какие-нибудь идеи о том, что здесь происходит? Если не использовать партии, какой может быть достойный подход для решения этой проблемы?

PS: Я установил CELERYD_PREFETCH_MULTIPLIER=0, как говорят документы.

4b9b3361

Ответ 1

Похоже, что поведение пакетных задач существенно отличается от обычных задач. Пакетные задачи даже не излучают сигналы типа task_success.

Поскольку вам нужно вызвать задачу completed после get_price, вы можете вызвать ее непосредственно из get_price.

@a.task(base=Batches, flush_every=10, flush_interval=5)
def get_price(requests):
    for request in requests:
         # do something
    completed.delay()