Сельдерей - цепочки групп и подзадачи. → Нарушение порядка - программирование
Подтвердить что ты не робот

Сельдерей - цепочки групп и подзадачи. → Нарушение порядка

Когда у меня есть что-то вроде следующего

group1 = group(task1.si(), task1.si(), task1.si())
group2 = group(task2.si(), task2.si(), task2.si())

workflow = chain(group1, group2, task3.si())

Интуитивная интерпретация заключается в том, что task3 должен выполняться только после завершения всех задач в группе 2.

В действительности задача 3 выполняется, пока группа 1 запущена, но еще не завершена.

Что я делаю неправильно?

4b9b3361

Ответ 1

Итак, как оказалось, в сельдерее вы не можете объединить две группы вместе.
Я подозреваю, что это происходит потому, что группы, прикованные к задачам, автоматически становятся аккордом
- > Документы сельдерея: http://docs.celeryproject.org/en/latest/userguide/canvas.html

Цепочка группы вместе с другой задачей будет автоматически обновляться это будет аккорд:

Группы возвращают родительскую задачу. Когда я связываю две группы вместе, я подозреваю, что когда первая группа завершается, аккорд начинает "задачу" обратного вызова. Я подозреваю, что эта "задача" на самом деле является "родительской задачей" второй группы. Я также подозреваю, что эта родительская задача завершается, как только она заканчивает отмену всех подзадач в группе, и в результате следующий элемент после второй группы выполняется.

Чтобы продемонстрировать это, вот несколько примеров кода. Вам нужно будет иметь уже имеющийся экземпляр сельдерея.

# celery_experiment.py

from celery import task, group, chain, chord
from celery.signals import task_sent, task_postrun, task_prerun

import time
import logging

import random
random.seed()

logging.basicConfig(level=logging.DEBUG)

### HANDLERS ###    
@task_prerun.connect()
def task_starting_handler(sender=None, task_id=None, task=None, args=None, kwargs=None, **kwds):    
    try:
        logging.info('[%s] starting' % kwargs['id'])
    except KeyError:
        pass

@task_postrun.connect()
def task_finished_handler(sender=None, task_id=None, task=None, args=None, kwargs=None, retval=None, state=None, **kwds):
    try:    
        logging.info('[%s] finished' % kwargs['id'])
    except KeyError:
        pass


def random_sleep(id):
    slp = random.randint(1, 3)
    logging.info('[%s] sleep for %ssecs' % (id, slp))
    time.sleep(slp)

@task()
def thing(id):
    logging.info('[%s] begin' % id)
    random_sleep(id)
    logging.info('[%s] end' % id)


def exec_exp():
    st = thing.si(id='st')
    st_arr = [thing.si(id='st_arr1_a'), thing.si(id='st_arr1_b'), thing.si(id='st_arr1_c'),]
    st_arr2 = [thing.si(id='st_arr2_a'), thing.si(id='st_arr2_b'),]
    st2 = thing.si(id='st2')
    st3 = thing.si(id='st3')
    st4 = thing.si(id='st4')

    grp1 = group(st_arr)
    grp2 = group(st_arr2)

    # chn can chain two groups together because they are seperated by a single subtask
    chn = (st | grp1 | st2 | grp2 | st3 | st4)

    # in chn2 you can't chain two groups together. what will happen is st3 will start before grp2 finishes
    #chn2 = (st | st2 | grp1 | grp2 | st3 |  st4)

    r = chn()
    #r2 = chn2()

Ответ 2

У меня такая же проблема с сельдереем, пытаясь создать рабочий процесс, где первый шаг - "создать миллион задач". Пробные группы групп, подзадачи, в конце концов мой шаг2 стартует до того, как закончится шаг 1.

Короче говоря, я мог найти решение с использованием аккордов и немого финишера:

@celery.task
def chordfinisher( *args, **kwargs ):
  return "OK"

Ничего не делать, но это позволяет мне сделать это:

tasks = []
for id in ids:
    tasks.append( mytask.si( id ) )
step1 = chord( group( tasks ), chordfinisher.si() )

step2 = ...

workflow = chain( step1, step2 )

Изначально я хотел иметь step1 в подзадаче, но по той же причине, что и предполагалось, действие вызова группы заканчивается, задача считается завершенной, и мой рабочий процесс перемещается...

Если у кого-то есть что-то лучше, мне интересно!