У меня есть задача check_orders, которая выполняется периодически. Он создает группу задач, чтобы я мог время, сколько времени выполнял задачи, и выполнять что-то, когда все это делается (это цель res.join [1] и grouped_subs). Сгруппированные задачи представляют собой пары связанных цепями.
То, что я хочу, это то, когда первая задача не отвечает условию (не удается), не выполняет вторую задачу в цепочке. Я не могу понять это для жизни меня, и я считаю, что это довольно базовая функциональность для менеджера очереди заданий. Когда я пробую то, что я прокомментировал после [2] (поднимая исключения, удаляя обратные вызовы)... мы по какой-то причине застреваем в join() в check_orders (он разбивает группу). Я попытался установить ignore_result для False также для всех этих задач, но он все еще не работает.
@task(ignore_result=True)
def check_orders():
# check all the orders and send out appropriate notifications
grouped_subs = []
for thingy in things:
...
grouped_subs.append(chain(is_room_open.subtask((args_sub_1, )),
notify.subtask((args_sub_2, ), immutable=True)))
res = group(grouped_subs).apply_async()
res.join() #[1]
logger.info('Done checking orders at %s' % current_task.request.id))
@task(ignore_result=True)
def is_room_open(args_sub_1):
#something time consuming
if http_req_and_parse(args_sub_1):
# go on and do the notify task
return True
else:
# [2]
# STOP THE CHAIN SOMEHOW! Don't execute the rest of the chain, how?
# None of the following things work:
# is_room_open.update_state(state='FAILURE')
# raise celery.exceptions.Ignore()
# raise Exception('spam', 'eggs')
# current_task.request.callbacks[:] = []
@task(ignore_result=True)
def notify(args_sub_2):
# something else time consuming, only do this if the first part of the chain
# passed a test (the chained tasks before this were 'successful'
notify_user(args_sub_2)