Подтвердить что ты не робот

Отчетность дала результаты многолетней задачи Сельдерея

Проблема

Я сегментировал долговременную задачу в логических подзадачах, поэтому я могу сообщать результаты каждой подзадачи по мере ее завершения. Тем не менее, я пытаюсь сообщить о результатах задачи, которая никогда не будет успешно завершена (вместо этого она даст значения, как она есть), и я изо всех сил стараюсь сделать это с помощью моего существующего решения.

Фон

Я создаю веб-интерфейс для некоторых программ Python, которые я написал. Пользователи могут отправлять задания через веб-формы, а затем проверять их работу.

Скажем, у меня есть две функции, каждая из которых доступна через отдельные формы:

  • med_func: Выполняется ~ 1 минута, результаты передаются на render(), который создает дополнительные данные.
  • long_func: возвращает генератор. Каждый yield занимает порядка 30 минут и должен сообщаться пользователю. Есть так много уроков, мы можем считать этот итератор бесконечным (завершение только тогда, когда отменено).

Код, текущая реализация

С med_func я сообщаю результаты следующим образом:

При отправке формы я сохраняю AsyncResult в сеансе Django:

    task_result = med_func.apply_async([form], link=render.s())
    request.session["task_result"] = task_result

Представление Django для страницы результатов обращается к этому AsyncResult. Когда задача завершена, результаты сохраняются в объект, который передается как контекст шаблону Django.

def results(request):
    """ Serve (possibly incomplete) results of a session latest run. """
    session = request.session

    try:  # Load most recent task
        task_result = session["task_result"]
    except KeyError:  # Already cleared, or doesn't exist
        if "results" not in session:
            session["status"] = "No job submitted"
    else:  # Extract data from Asynchronous Tasks
        session["status"] = task_result.status
        if task_result.ready():
            session["results"] = task_result.get()
            render_task = task_result.children[0]

            # Decorate with rendering results
            session["render_status"] = render_task.status
            if render_task.ready():
                session["results"].render_output = render_task.get()
                del(request.session["task_result"])  # Don't need any more

    return render_to_response('results.html', request.session)

Это решение работает только тогда, когда функция фактически завершается. Я не могу объединить логические подзадачи long_func, потому что есть неизвестное число yield (каждая итерация цикла long_func может не дать результата).

Вопрос

Есть ли разумный способ доступа к предоставленным объектам из чрезвычайно долговременной задачи Сельдерея, чтобы они могли отображаться до истощения генератора?

4b9b3361

Ответ 1

Чтобы сельдерей знал, что такое текущее состояние задачи, он устанавливает некоторые метаданные в любом исходном бэкэнде. Вы можете копировать, чтобы хранить другие виды метаданных.

def yielder():
    for i in range(2**100):
        yield i

@task
def report_progress():
    for progress in yielder():
        # set current progress on the task
        report_progress.backend.mark_as_started(
            report_progress.request.id,
            progress=progress)

def view_function(request):
    task_id = request.session['task_id']
    task = AsyncResult(task_id)
    progress = task.info['progress']
    # do something with your current progress

Я бы не выбрал тонну данных, но он хорошо работает для отслеживания прогресса долговременной задачи.

Ответ 2

Пол ответ велик. В качестве альтернативы использованию mark_as_started вы можете использовать метод Task update_state. Они в конечном счете делают то же самое, но имя "update_state" немного более подходит для того, что вы пытаетесь сделать. Вы можете опционально определить настраиваемое состояние, которое указывает, что ваша задача выполняется (я назвал свое пользовательское состояние "PROGRESS" ):

def yielder():
    for i in range(2**100):
        yield i

@task
def report_progress():
    for progress in yielder():
        # set current progress on the task
        report_progress.update_state(state='PROGRESS', meta={'progress': progress})

def view_function(request):
    task_id = request.session['task_id']
    task = AsyncResult(task_id)
    progress = task.info['progress']
    # do something with your current progress

Ответ 3

Часть сельдерея:

def long_func(*args, **kwargs):
    i = 0
    while True:
        yield i
        do_something_here(*args, **kwargs)
        i += 1


@task()
def test_yield_task(task_id=None, **kwargs):
    the_progress = 0        
    for the_progress in long_func(**kwargs):
        cache.set('celery-task-%s' % task_id, the_progress)

Сторона Webclient, начальная задача:

r = test_yield_task.apply_async()
request.session['task_id'] = r.task_id

Тестирование последнего полученного значения:

   v = cache.get('celery-task-%s' % session.get('task_id'))
   if v:
        do_someting()

Если вам не нравится использовать кеш, или это невозможно, вы можете использовать db, файл или любое другое место, в котором рабочий и сельдерей будет иметь оба доступа. С кешем это простейшее решение, но рабочие и серверы должны использовать один и тот же кеш.

Ответ 4

Пара вариантов:

1 - группы задач. Если вы можете перечислить все вспомогательные задачи с момента вызова, вы можете применить группу в целом, которая возвращает объект TaskSetResult, который вы можете использовать для мониторинга результатов группы в целом или отдельных задач в группе - запросите это как необходимо, когда вам нужно проверить статус.

2 - обратные вызовы. Если вы не можете перечислить все подзадачи (или даже если можете!), Вы можете определить веб-крючок/обратный вызов, который последний шаг в задаче вызвал, когда остальная часть задачи будет завершена. Крючок будет против URI в вашем приложении, которое поглощает результат и делает его доступным через DB или API-интерфейс приложения.

Некоторые из них могут решить вашу проблему.

Ответ 6

Лично я хотел бы видеть время начала, продолжительность, прогресс (количество предоставленных элементов), время остановки (или ETA), статус и любую другую полезную информацию. Было бы неплохо, если бы он выглядел похожим на соответствующий дисплей, возможно, как ps на Linux. Это, в конце концов, статус процесса.

Вы можете включить некоторые опции для приостановки или уничтожения задачи и/или "открыть" ее и отобразить подробную информацию о дочерних элементах или результатах.