Подтвердить что ты не робот

Как определить, была ли задача поставлена ​​в очередь в django-celery?

Здесь моя настройка:

  • django 1.3
  • сельдерей 2.2.6
  • django-celery 2.2.4
  • djkombu 0.9.2

В моем файле settings.py у меня есть

BROKER_BACKEND = "djkombu.transport.DatabaseTransport"

то есть. Я просто использую базу данных для выполнения задач в очереди.

Теперь о моей проблеме: у меня есть пользовательская задача, которая может занять несколько минут. Я хочу, чтобы задача выполнялась только один раз для каждого пользователя, и я буду кэшировать результаты задачи во временном файле, поэтому, если пользователь снова инициирует задачу, я просто возвращаю кешированный файл. У меня есть код, который выглядит так в моей функции:

task_id = "long-task-%d" % user_id
result = tasks.some_long_task.AsyncResult(task_id)

if result.state == celery.states.PENDING:
    # The next line makes a duplicate task if the user rapidly refreshes the page
    tasks.some_long_task.apply_async(task_id=task_id)
    return HttpResponse("Task started...")
elif result.state == celery.states.STARTED:
    return HttpResponse("Task is still running, please wait...")
elif result.state == celery.states.SUCCESS:
    if cached_file_still_exists():
        return get_cached_file()
    else:
        result.forget()
        tasks.some_long_task.apply_async(task_id=task_id)
        return HttpResponse("Task started...")

Этот код почти работает. Но я столкнулся с проблемой, когда пользователь быстро перезагружает страницу. Там 1-3-секундная задержка между тем, когда задача поставлена ​​в очередь, и когда задача окончательно снята с очереди и передается работнику. За это время состояние задачи остается PENDING, из-за чего логика представления запускает повторяющуюся задачу.

Мне нужно, чтобы определить, была ли задача уже отправлена ​​в очередь, поэтому я не отправляю ее дважды. Есть ли стандартный способ сделать это в сельдерее?

4b9b3361

Ответ 1

Вы можете обмануть бит, сохранив результат вручную в базе данных. Позвольте мне объяснить, как это поможет.

Например, если используется RDBMS (таблица с столбцами - task_id, state, result):

Просмотр части:

  • Используйте управление транзакциями.
  • Используйте SELECT FOR UPDATE, чтобы получить строку где task_id == "long-task-% d" % user_id. SELECT FOR UPDATE будет блокировать другие запросы до тех пор, пока это не будет COMMIT или ROLLBACK.
  • Если этого не существует - установите состояние в PENDING и запустите "some_long_task", завершите запрос.
  • Если состояние PENDING - информирует пользователя.
  • Если состояние SUCCESS - установить состояние PENDING, запустите задачу, верните файл, на который указывает столбец "результат". Я основываю это на предположении, что вы хотите повторно запустить задачу по получению результата. COMMIT
  • Если состояние ERROR - установить состояние PENDING, запустите задачу, сообщите об этом пользователю. COMMIT

Задача:

  • Подготовьте файл, заверните в try, catch block.
  • В случае успеха - ОБНОВЛЯЙТЕ правильную строку с состоянием = УСПЕХ, результат.
  • В случае сбоя - ОБНОВЛЯЙТЕ правильную строку с состоянием = ОШИБКА.

Ответ 2

Я решил это с Редисом. Просто установите ключ в redis для каждой задачи, а затем удалите ключ из redis в задаче after_return. Redis легкий и быстрый.

Ответ 3

Я не думаю (как предложил Томек и другие), что использование базы данных - это способ сделать эту блокировку. django имеет встроенную структуру кэша, которая должна быть достаточной для выполнения этой блокировки и должна выполняться быстрее. См.:

http://docs.celeryproject.org/en/latest/tutorials/task-cookbook.html#cookbook-task-serial

Django может быть настроен на использование memcached в качестве его кеша, и это может быть распределено между несколькими машинами... мне это кажется лучше. Мысли?