Подтвердить что ты не робот

Узнайте, существует ли задача сельдерея

Можно ли выяснить, существует ли задача с определенным идентификатором задачи? Когда я попытаюсь получить статус, я всегда буду ждать.

>>> AsyncResult('...').status
'PENDING'

Я хочу знать, является ли данный идентификатор задачи реальным идентификатором задачи сельдерея, а не случайной строкой. Я хочу получить разные результаты в зависимости от того, существует ли действительная задача для определенного идентификатора.

В прошлом, возможно, была действительная задача с тем же идентификатором, но результаты могли быть удалены из бэкэнд.

4b9b3361

Ответ 1

Celery не записывает состояние при отправке задачи, это частично является оптимизацией (см. Http://docs.celeryproject.org/en/latest/userguide/tasks.html#state).

Если вам это действительно нужно, просто добавьте:

from celery import current_app
# 'after_task_publish' is available in celery 3.1+
# for older versions use the deprecated 'task_sent' signal
from celery.signals import after_task_publish

# when using celery versions older than 4.0, use body instead of headers

@after_task_publish.connect
def update_sent_state(sender=None, headers=None, **kwargs):
    # the task may not exist if sent using 'send_task' which
    # sends tasks by name, so fall back to the default result backend
    # if that is the case.
    task = current_app.tasks.get(sender)
    backend = task.backend if task else current_app.backend

    backend.store_result(headers['id'], None, "SENT")

Затем вы можете проверить состояние PENDING, чтобы обнаружить, что задача не была отправлена:

>>> result.state != "PENDING"

Ответ 2

AsyncResult.state возвращает PENDING в случае неизвестных идентификаторов задач.

ОЖИДАНИЕ

Задача ждет выполнения или неизвестна. Любой идентификатор задачи, который не является известно, что он находится в состоянии ожидания.

http://docs.celeryproject.org/en/latest/userguide/tasks.html#pending

Вы можете предоставить настраиваемые идентификаторы задач, если вам нужно различать неизвестные идентификаторы от существующих:

>>> from tasks import add
>>> from celery.utils import uuid
>>> r = add.apply_async(args=[1, 2], task_id="celery-task-id-"+uuid())
>>> id = r.task_id
>>> id
'celery-task-id-b774c3f9-5280-4ebe-a770-14a6977090cd'
>>> if not "blubb".startswith("celery-task-id-"): print "Unknown task id"
... 
Unknown task id
>>> if not id.startswith("celery-task-id-"): print "Unknown task id"
... 

Ответ 3

Сейчас я использую следующую схему:

  • Получить идентификатор задачи.
  • Установите ключ memcache, например 'task_% s'% task.id сообщение 'Started'.
  • Передайте идентификатор задачи клиенту.
  • Теперь от клиента я могу контролировать состояние задачи (заданное из сообщений задачи в memcache).
  • Из задачи в готовом виде - установите в ключевое сообщение memcache сообщение "Готов".
  • От клиента к заданию готов - запустите специальную задачу, которая удалит ключ из memcache и сделает необходимые действия по очистке.

Ответ 4

Вам нужно вызвать .get() в созданном объекте AsyncTask, чтобы на самом деле извлечь результат из бэкэнд.

См. Часто задаваемые вопросы по сельдеру.


Дальнейшее разъяснение моего ответа.

Любая строка технически является допустимым идентификатором, нет способа проверить идентификатор задачи. Единственный способ выяснить, существует ли задача - спросить бэкэнд, если он знает об этом, и сделать это, вы должны использовать .get().

Это указывает на проблему, когда блок .get() блокирует, когда бэкэнд не имеет никакой информации о указанном идентификаторе задачи, это по дизайну, чтобы вы могли запустить задачу, а затем дождаться ее завершения.

В случае исходного вопроса я собираюсь предположить, что OP хочет получить состояние ранее завершенной задачи. Для этого вы можете пропустить очень малые тайм-аут и ошибки таймаута catch:

from celery.exceptions import TimeoutError
try:
    # fetch the result from the backend
    # your backend must be fast enough to return
    # results within 100ms (0.1 seconds)
    result = AsyncResult('blubb').get(timeout=0.1)
except TimeoutError:
    result = None

if result:
    print "Result exists; state=%s" % (result.state,)
else:
    print "Result does not exist"

Разумеется, это работает только в том случае, если ваш бэкэнд хранит результаты, если нет способа узнать, является ли идентификатор задачи действительным или нет, потому что ничто не ведет запись о них.


Еще больше разъяснений.

То, что вы хотите сделать, не может быть выполнено с использованием бэкэнда AMQP, потому что он не сохраняет результаты, он пересылает их.

Мое предложение состояло в том, чтобы переключиться на бэкэнд базы данных, чтобы результаты были в базе данных, которую вы можете запросить вне существующих модулей сельдерея. Если в базе данных результатов нет задач, вы можете предположить, что идентификатор недействителен.

Ответ 5

Try

AsyncResult('blubb').state

который может работать.

Он должен вернуть что-то другое.

Ответ 6

Пожалуйста, исправьте меня, если я ошибаюсь.

if built_in_status_check(task_id) == 'pending'
   if registry_exists(task_id) == true
      print 'Pending'
   else
      print 'Task does not exist'