Как связать задачу сельдерея, которая возвращает список в группу? - программирование
Подтвердить что ты не робот

Как связать задачу сельдерея, которая возвращает список в группу?

Я хочу создать группу из списка, возвращаемого задачей Celery, так что для каждого элемента в наборе результатов задачи одна задача будет добавлена ​​в группу.

Вот простой пример кода, чтобы объяснить пример использования. ??? должен быть результатом предыдущей задачи.

@celery.task
def get_list(amount):
    # In reality, fetch a list of items from a db
    return [i for i in range(amount)]

@celery.task
def process_item(item):
    #do stuff
    pass

process_list = (get_list.s(10) | group(process_item.s(i) for i in ???))

Я, вероятно, не подходит к этому правильно, но я уверен, что небезопасно вызывать задачи из задач:

@celery.task
def process_list():
    for i in get_list.delay().get():
        process_item.delay(i)

Мне не нужен результат из задачи секунд.

4b9b3361

Ответ 1

Вы можете получить такое поведение, используя промежуточную задачу. Здесь показана демонстрация создания такого "картографического" метода, который работает так, как вы предлагали.

from celery import task, subtask, group

@task
def get_list(amount):
    return [i for i in range(amount)]

@task
def process_item(item):
    # do stuff
    pass

@task
def dmap(it, callback):
    # Map a callback over an iterator and return as a group
    callback = subtask(callback)
    return group(callback.clone([arg,]) for arg in it)()

# runs process_item for each item in the return of get_list 
process_list = (get_list.s(10) | dmap.s(process_item.s()))

Кредит Спросить Солема за то, что он дал мне это предложение, когда я попросил его о помощи по аналогичной проблеме.