Подтвердить что ты не робот

Можно ли пропустить делегирование задачи сельдерея, если параметры и имя задачи уже поставлены в очередь на сервере?

Скажите, что у меня есть эта задача:

def do_stuff_for_some_time(some_id):
    e = Model.objects.get(id=some_id)
    e.domanystuff()

и я использую его так:

do_stuff_for_some_time.apply_async(args=[some_id], queue='some_queue')

Проблема, с которой я сталкиваюсь, заключается в том, что существует множество повторяющихся задач с одним и тем же параметром arg, и он ошеломляет очередь.

Можно ли применять async только в том случае, если одни и те же аргументы и одна и та же задача не находятся в очереди?

4b9b3361

Ответ 1

celery-singleton решает это требование

Предостережение: требуется redis broker (для распределенных блокировок)

pip install celery-singleton

Используйте базовый класс задачи Singleton:

from celery_singleton import Singleton

@celery_app.task(base=Singleton)
def do_stuff_for_some_time(some_id):
    e = Model.objects.get(id=some_id)
    e.domanystuff()


из документов:

вызовы do_stuff.delay() будут либо ставить новую задачу или вернуть AsyncResult для текущего в очереди/запущенного экземпляра задача

Ответ 2

Я не уверен, есть ли у сельдерея такой вариант. Тем не менее, я хотел бы предложить обход.

1) Создайте модель для всех задач сельдерея, стоящих в очереди. В этой модели сохраните имя_задачи, имя_очереди, а также параметры

2) Используйте get_or_create для этой модели для каждой задачи сельдерея, которая готова к постановке в очередь.

3) Если created = True с шага 2, разрешите добавление задачи в очередь, иначе не добавляйте задачу в очередь

Ответ 3

Я бы попробовал сочетание cache lock и task result backend, в котором хранятся результаты каждой задачи:

  • Блокировка кэша предотвратит выполнение задач с теми же аргументами, которые будут добавлены в очередь несколько раз. Документация Celery содержит хороший пример реализации блокировки кеша здесь, но если вы не хотите создавать его самостоятельно, вы можете использовать celery-once.

  • Для бэкэнд результата задачи мы будем использовать рекомендуемый django-celery-results, который создает таблицу TaskResult, которую мы будет запрашивать результаты задачи.

Пример:

  • Установите и настройте django-celery-results:

    settings.py:

    INSTALLED_APPS = (
        ...,
        'django_celery_results',
    )
    CELERY_RESULT_BACKEND = 'django-db'  # You can also use 'django-cache'
    

    ./manage.py migrate django_celery_results

  • Установите и настройте модуль celery-once:

    tasks.py:

    from celery import Celery
    from celery_once import QueueOnce
    from time import sleep
    
    celery = Celery('tasks', broker='amqp://[email protected]//')
    celery.conf.ONCE = {
        'backend': 'celery_once.backends.Redis',
        'settings': {
            'url': 'redis://localhost:6379/0',
            'default_timeout': 60 * 60
         }
    }
    
    @celery.task(base=QueueOnce)
    def do_stuff_for_some_time(some_id):
        e = Model.objects.get(id=some_id)
        e.domanystuff()
    

    В этот момент, если задача с теми же аргументами будет выполнена,
    исключение AlreadyQueued.

  • Используйте приведенное выше:

    from django_celery_results.models import TaskResult
    
    try:
        result = do_stuff_for_some_time(some_id)
    except AlreadyQueued:
        result = TaskResult.objects.get(task_args=some_id)
    

Предостережения:

  • Учтите, что в момент возникновения исключения AlreadyQueued исходная задача с аргументом = some_id может не выполняться и поэтому она не будет иметь результатов в таблице TaskResult.

  • Помните все в своем коде, которое может пойти не так, и повесить любой из вышеперечисленных процессов (потому что он это сделает!).

Дополнительное чтение: