Подтвердить что ты не робот

Получение всех идентификаторов задач из вложенных цепочек и аккордов

Я использую Celery 3.1.9 с бэкэндом Redis. Работа, которую я выполняю, состоит из нескольких подзадач, которые выполняются в аккордах и цепочках. Структура выглядит следующим образом:

  1. подготовить
  2. загрузить данные (аккорд из 2 человек)
  3. анализировать и хранить загруженные данные
  4. длительный аккорд из 4 человек
  5. завершить
  6. создать отчет

Каждый элемент в списке является подзадачей, все они связаны друг с другом. Шаги 2 и 4 являются аккордами. Все это связано с созданием аккорда для шага 4, обратный вызов которого представляет собой цепочку 4 → 6, затем для шага 2 создается аккорд, обратный вызов которого равен 3 → первый аккорд. Затем, наконец, создается цепочка 1 → второй аккорд. Затем эта цепочка запускается с помощью delay(), а ее идентификатор сохраняется в базе данных.

Проблема двоякая. Во-первых, я хочу иметь возможность отозвать все это, а во-вторых, я хочу, чтобы в моем классе Task была особая on_failure, которая выполняет некоторую очистку и сообщает о сбое пользователю.

В настоящее время я храню идентификатор задачи цепочки. Я думал, что смогу использовать это, чтобы отозвать цепь. Кроме того, в случае ошибки я хотел пройти цепочку к ее корню (в обработчике on_failure), чтобы извлечь соответствующую запись из базы данных. Это не работает, потому что, когда вы воссоздаете экземпляр AsyncResult только с идентификатором задачи, его родительский атрибут - None.

Во-вторых, я пытался сохранить результат вызова serializable() во внешнем цепочке результатов. Это, однако, не возвращает все дерево объектов AsyncResult, оно просто возвращает идентификаторы первого уровня в цепочке (поэтому не идентификаторы дочерних элементов в аккордах.)

Третье, что я пытался, было реализовать serializable() самостоятельно, но, как оказалось, причина, по которой оригинальный метод не идет дальше двух уровней, заключается в том, что дочерние элементы цепочки являются объектами celery.canvas.chord, а не экземплярами AsyncResult.

Иллюстрация проблемы:

chord([
    foo.si(),
    foo.si(),
    foo.si(),
], bar.si() | bar.si())
res = chord.apply_async()
pprint(res.serializable())

Печатает следующее:

(('50c9eb94-7a63-49dc-b491-6fce5fed3713',
  ('d95a82b7-c107-4a2c-81eb-296dc3fb88c3',
   [(('7c72310b-afc7-4010-9de4-e64cd9d30281', None), None),
    (('2cb80041-ff29-45fe-b40c-2781b17e59dd', None), None),
    (('e85ab83d-dd44-44b5-b79a-2bbf83c4332f', None), None)])),
 None)

Первый идентификатор - это идентификатор цепочки обратных вызовов, второй идентификатор - из самой задачи аккорда, а последние три - это реальные задачи внутри аккорда. Но я не могу получить результат от задачи внутри цепочки обратных вызовов (т.е. идентификатор двух вызовов bar.si()).

Есть ли способ получить фактические идентификаторы задачи?

4b9b3361