Подтвердить что ты не робот

Запустить обратный вызов хорды, даже если основные задачи не выполняются

Возможно ли выполнить обратный вызов chord, даже если основные задачи не удались?

Я создал аккорд, в который я добавил кучу задач и зарегистрировал обратный вызов. Моя проблема заключается в том, что если одна из задач не срабатывает, обратный вызов не запускается, но я хотел бы, чтобы обратный вызов запускался в любом случае.

Я попытался зарегистрировать обратный вызов с помощью si() (immutability)

callback = tasks.run_delete_rule.si([timestamp])
header = [tasks.run_update_rule.s(i, timestamp) for i in item_ids]
result = chord(header)(callback)

Я также попытался добавить параметр ignore_result=True в оба декоратора задач, но не добился успеха.

4b9b3361

Ответ 1

Из проблемы github # 1881, если обратный вызов имеет параметр link_error, который принимает список имен задач, тогда, когда задача хорды не выполняется задачи link_error будут выполнены.

@task(name='super_task.good')
def good():
    return True

@task(name='super_task.raise_exception')
def raise_exception():
    raise ValueError('error')

@task(name='super_task.callback')
def callback(*args, **kwargs):
    logger.info('callback')
    logger.info(args)
    logger.info(kwargs)
    return 'finished'

@task(name='super_task.error_callback')
def error_callback(*args, **kwargs):
    logger.info('error_callback')
    logger.info(args)
    logger.info(kwargs)
    return 'error'

>>> c = chord(
        [raise_exception.s(), good.s(), raise_exception.s()], 
        callback.s().set(link_error=['super_task.error_callback'])
    )
>>> result = c()

Это приведет к выполнению аккорда и в вашем журнале сельдерея, вы увидите, что задача raise_exception завершится неудачно, а выполнение error_callback, которое будет получено в нем, вызывает task_id из callback.

В этот момент значение result будет содержать экземпляр AsyncResult callback, а потому, что в аккорде ошибки распространяются на обратный вызов, делая result.get(), приведет к исключению задач и result.traceback дает вам трассировку.

Если вы хотите иметь один обратный вызов, просто передайте имя обратного вызова хорды на link_error

callback.s().set(link_error='super_task.callback')

ПРИМЕЧАНИЕ

Другим вариантом является установка CELERY_CHORD_PROPAGATES = False, которая вернется к поведению предсердия 3.1 и всегда будет выполнять обратный вызов.

Но это не рекомендуется, потому что, как вы можете найти в проблеме github # 1349

Сельдерей 3.1 определяет, как обрабатываются хордовые ошибки, предыдущее поведение никогда не документировалось и больше несчастного случая, так как это никогда не было целью работать таким образом.

Мы не могли изменить поведение в выпуске исправления, поэтому вместо этого нужно было установить параметр, но никогда не было намерения, чтобы кто-то сознательно отключил новое поведение.

Новое поведение существует для защиты от такого рода проблем, и удаленная совместимость может быть удалена. Я предлагаю вам найти другой способ обработки ошибок здесь (и я бы не возражал против предложения, если вы можете придумать хороший api для него)

Ответ 2

Вам просто нужно изменить способ link_error. Вместо ссылки на строку передайте подпись с нужными аргументами.

В приведенном выше примере вы можете передать аргумент следующим образом

c = chord(
    [raise_exception.s(), good.s(), raise_exception.s()], 
    callback.s().set(link_error=[error_callback.s(<arguments_here>)])
)