Подтвердить что ты не робот

Как проверить и отменить задачи Сельдерея по имени задачи

Я использую Celery (3.0.15) с Redis в качестве брокера.

Есть ли простой способ запросить количество задач с заданным именем, которые существуют в очереди сельдерея?

И, в качестве продолжения, есть способ отменить все задачи с заданным именем, которые существуют в очереди сельдерея?

Я прошел через Руководство по мониторингу и управлению и не вижу там решения.

4b9b3361

Ответ 1

# Retrieve tasks
# Reference: http://docs.celeryproject.org/en/latest/reference/celery.events.state.html
query = celery.events.state.tasks_by_type(your_task_name)

# Kill tasks
# Reference: http://docs.celeryproject.org/en/latest/userguide/workers.html#revoking-tasks
for uuid, task in query:
    celery.control.revoke(uuid, terminate=True)

Ответ 2

Есть одна проблема, о которой ранее не ответили ранее, и может отбросить людей, если они не знают об этом.

Среди уже опубликованных решений я бы использовал Danielle с одной незначительной модификацией: я бы импортировал задачу в свой файл и использовал свой атрибут .name для получить имя задачи для перехода на .tasks_by_type().

app.control.revoke(
    [uuid for uuid, _ in
     celery.events.state.State().tasks_by_type(task.name)])

Однако это решение будет игнорировать те задачи, которые были запланированы для будущего выполнения. Как и некоторые люди, которые комментировали другие ответы, когда я проверил, что .tasks_by_type() return, у меня был пустой список. И действительно, мои очереди были пусты. Но я знал, что в будущем запланированы задачи, и это была моя основная цель. Я мог видеть их, выполнив celery -A [app] inspect scheduled, но на них не повлиял код выше.

Мне удалось отменить запланированные задачи, выполнив следующие действия:

app.control.revoke(
    [scheduled["request"]["id"] for scheduled in
     chain.from_iterable(app.control.inspect().scheduled()
                         .itervalues())])

app.control.inspect().scheduled() возвращает словарь, ключи которого являются именами рабочих и значениями, являются списками информации о планировании (следовательно, требуется chain.from_iterable, который импортируется из itertools). Информация о задаче находится в поле "request" информации о планировании, а "id" содержит идентификатор задачи. Обратите внимание, что даже после аннулирования запланированная задача по-прежнему будет отображаться среди запланированных задач. Запланированные задачи, которые будут отменены, не будут удалены из списка запланированных задач до истечения срока их таймера или до тех пор, пока Celery не выполнит какую-либо операцию очистки. (Перезагрузка работников запускает такую ​​очистку.)

Ответ 3

Вы можете сделать это одним запросом:

app.control.revoke([
    uuid
    for uuid, _ in
    celery.events.state.State().tasks_by_type(task_name)
])

Ответ 4

Похоже, flower обеспечивает мониторинг:

https://github.com/mher/flower

Мониторинг в режиме реального времени с использованием событий с сельдереем

Выполнение задачи и история Возможность отображения сведений о задаче (аргументы, время запуска, время выполнения и т.д.) Графики и статистика Пульт дистанционного управления

Просмотр статуса и статистики рабочего состояния Завершение работы и перезагрузка рабочего экземпляров. Размер пула рабочего пула и автомасштабирование. Просмотр и изменить очереди, которые рабочий экземпляр потребляет из представления в настоящее время выполняемые задачи Просмотр запланированных задач (ETA/обратный отсчет) Просмотр зарезервированных и отмененные задачи Применить ограничения времени и скорости. или завершать задачи HTTP API

Аутентификация OpenID