Подтвердить что ты не робот

Сельдерей прекращает выполнение цепочки

У меня есть задача check_orders, которая выполняется периодически. Он создает группу задач, чтобы я мог время, сколько времени выполнял задачи, и выполнять что-то, когда все это делается (это цель res.join [1] и grouped_subs). Сгруппированные задачи представляют собой пары связанных цепями.

То, что я хочу, это то, когда первая задача не отвечает условию (не удается), не выполняет вторую задачу в цепочке. Я не могу понять это для жизни меня, и я считаю, что это довольно базовая функциональность для менеджера очереди заданий. Когда я пробую то, что я прокомментировал после [2] (поднимая исключения, удаляя обратные вызовы)... мы по какой-то причине застреваем в join() в check_orders (он разбивает группу). Я попытался установить ignore_result для False также для всех этих задач, но он все еще не работает.

@task(ignore_result=True)
def check_orders():
    # check all the orders and send out appropriate notifications
    grouped_subs = []

    for thingy in things:
       ...

        grouped_subs.append(chain(is_room_open.subtask((args_sub_1, )), 
                        notify.subtask((args_sub_2, ), immutable=True)))

    res = group(grouped_subs).apply_async()

    res.join()         #[1]
    logger.info('Done checking orders at %s' % current_task.request.id))

@task(ignore_result=True)
def is_room_open(args_sub_1):
    #something time consuming
    if http_req_and_parse(args_sub_1):
        # go on and do the notify task
        return True
    else:
        # [2]
        # STOP THE CHAIN SOMEHOW! Don't execute the rest of the chain, how?
        # None of the following things work:
        # is_room_open.update_state(state='FAILURE')
        # raise celery.exceptions.Ignore()
        # raise Exception('spam', 'eggs')
        # current_task.request.callbacks[:] = []

@task(ignore_result=True)
def notify(args_sub_2):
    # something else time consuming, only do this if the first part of the chain 
    # passed a test (the chained tasks before this were 'successful'
    notify_user(args_sub_2)
4b9b3361

Ответ 1

По-моему, это распространенный случай использования, который не получает достаточной любви в документации.

Предполагая, что вы хотите прервать цепочку в середине пути, все еще сообщая о SUCCESS как о статусе выполненных задач, а не отправляя какой-либо журнал ошибок или еще ничего (иначе вы можете просто создать исключение), то способ выполнить это:

@app.task(bind=True)  # Note that we need bind=True for self to work
def task1(self, other_args):
    #do_stuff
    if end_chain:
        self.request.callbacks = None
        return
    #Other stuff to do if end_chain is False

Итак, в вашем примере:

@app.task(ignore_result=True, bind=True)
def is_room_open(self, args_sub_1):
    #something time consuming
    if http_req_and_parse(args_sub_1):
        # go on and do the notify task
        return True
    else:
        self.request.callbacks = None

Будет работать. Обратите внимание, что вместо ignore_result=True и subtask() вы можете использовать ярлык .si(), как указано в @abbasov-alexander

Отредактировано для работы с режимом EAGER, как было предложено @PhilipGarnero в комментариях.

Ответ 2

Это невероятно, поскольку столь распространенный случай не рассматривается в какой-либо официальной документации. Мне пришлось справиться с той же проблемой (но с использованием shared_tasks с параметром bind, поэтому мы видим видимость объекта self), поэтому я написал собственный декоратор, который автоматически обрабатывает аннулирование:

def revoke_chain_authority(a_shared_task):
    """
    @see: https://gist.github.com/bloudermilk/2173940
    @param a_shared_task: a @shared_task(bind=True) celery function.
    @return:
    """
    @wraps(a_shared_task)
    def inner(self, *args, **kwargs):
        try:
            return a_shared_task(self, *args, **kwargs)
        except RevokeChainRequested, e:
            # Drop subsequent tasks in chain (if not EAGER mode)
            if self.request.callbacks:
                self.request.callbacks[:] = []
            return e.return_value

    return inner

Вы можете использовать его следующим образом:

@shared_task(bind=True)
@revoke_chain_authority
def apply_fetching_decision(self, latitude, longitude):
    #...

    if condition:
        raise RevokeChainRequested(False)

Посмотрите полное описание здесь. Надеюсь, это поможет!

Ответ 3

Во-первых, кажется, что в функции существует исключение ignore_result не поможет вам.

Во-вторых, вы используете immutable = True. Это означает, что следующая функция (в нашем случае - уведомление) не принимает дополнительных аргументов. Вы должны использовать notify.subtask((args_sub_2, ), immutable=False), конечно, если это подходит для вашего решения.

В-третьих, вы можете использовать ярлыки:

notify.si(args_sub_2) вместо notify.subtask((args_sub_2, ), immutable=True)

и

is_room_open.s(args_sub_1) вместо is_room_open.subtask((args_sub_1, ))

Попробуйте использовать его код:

@task
def check_orders():
    # check all the orders and send out appropriate notifications
    grouped_subs = []

    for thingy in things:
       ...

        grouped_subs.append(chain(is_room_open.s(args_sub_1), 
                                  notify.s(args_sub_2)))

    res = group(grouped_subs).apply_async()

    res.join()         #[1]
    logger.info('Done checking orders at %s' % current_task.request.id))

@task
def is_room_open(args_sub_1):
    #something time consuming
    if http_req_and_parse(args_sub_1):
        # go on and do the notify task
        return True
    else:
        # [2]
        # STOP THE CHAIN SOMEHOW! Don't execute the rest of the chain, how?
        # None of the following things work:
        # is_room_open.update_state(state='FAILURE')
        # raise celery.exceptions.Ignore()
        # raise Exception('spam', 'eggs')
        # current_task.request.callbacks[:] = []
        return False

@task
def notify(result, args_sub_2):
    if result:
        # something else time consuming, only do this if the first part of the chain 
        # passed a test (the chained tasks before this were 'successful'
        notify_user(args_sub_2)
        return True
    return False

Если вы хотите исключений catch, вы должны использовать обратный вызов таким образом

is_room_open.s(args_sub_1, link_error=log_error.s())

from proj.celery import celery

@celery.task
def log_error(task_id):
    result = celery.AsyncResult(task_id)
    result.get(propagate=False)  # make sure result written.
    with open(os.path.join('/var/errors', task_id), 'a') as fh:
        fh.write('--\n\n%s %s %s' % (
            task_id, result.result, result.traceback))