Подтвердить что ты не робот

Как сделать задачу сельдерея неудачной из задачи?

В некоторых условиях я хочу сделать задачу сельдерея неудачной из этой задачи. Я попробовал следующее:

from celery.task import task
from celery import states

@task()
def run_simulation():
    if some_condition:
        run_simulation.update_state(state=states.FAILURE)
        return False

Однако задача все еще сообщает, что она преуспела:

Задача sim.tasks.run_simulation [9235e3a7-c6d2-4219-bbc7-acf65c816e65] преуспел в 1.17847704887s: False

Кажется, что состояние может быть изменено только во время выполнения задачи и после его завершения - сельдерей изменяет состояние на то, что он считает результатом (см. этот вопрос). Есть ли какой-либо способ, не выполняя задачу, создавая исключение, чтобы вернуть сельдерей, что задача не удалась?

4b9b3361

Ответ 1

Я получил интересный ответ по этому вопросу от Ask Solem, где он предлагает обработчик after_return для решения проблемы. Это может быть интересным вариантом для будущего.

Тем временем я решил проблему, просто вернув строку "НЕИСПРАВНОСТЬ" из задачи, когда я хочу, чтобы она потерпила неудачу, а затем проверила ее следующим образом:

result = AsyncResult(task_id)
if result.state == 'FAILURE' or (result.state == 'SUCCESS' and result.get() == 'FAILURE'):
    # Failure processing task 

Ответ 2

Чтобы пометить задачу как невыполненную, не вызывая исключение, измените состояние задачи на FAILURE а затем Ignore исключение Ignore, поскольку при возврате любого значения задание будет считаться успешным, например:

from celery import Celery, states
from celery.exceptions import Ignore

app = Celery('tasks', broker='amqp://[email protected]//')

@app.task(bind=True)
def run_simulation(self):
    if some_condition:
        # manually update the task state
        self.update_state(
            state = states.FAILURE,
            meta = 'REASON FOR FAILURE'
        )

        # ignore the task so no other state is recorded
        raise Ignore()

Но лучший способ - вызвать исключение из вашей задачи, вы можете создать собственное исключение для отслеживания этих ошибок:

class TaskFailure(Exception):
   pass

И поднимите это исключение из вашей задачи:

if some_condition:
    raise TaskFailure('Failure reason')

Ответ 3

Я хотел бы более подробно остановиться на ответе Пьера, поскольку столкнулся с некоторыми проблемами при использовании предложенного решения.

Чтобы разрешить настраиваемые поля при обновлении состояния задачи до состояний .FAILURE, важно также смоделировать некоторые атрибуты, которые будут иметь состояние FAILURE (обратите внимание на exc_type и exc_message). В то время как решение завершит задачу, любая попытка запросить состояние (для пример - получить значение "REASON FOR FAILURE") не удастся.

Ниже приведен фрагмент для справки, который я взял с: https://www.distributedpython.com/2018/09/28/celery-task-states/

@app.task(bind=True)
def task(self):
    try:
        raise ValueError('Some error')
    except Exception as ex:
        self.update_state(
            state=states.FAILURE,
            meta={
                'exc_type': type(ex).__name__,
                'exc_message': traceback.format_exc().split('\n')
                'custom': '...'
            })
        raise Ignore()