Подтвердить что ты не робот

Параллельно распределенная задача сельдерея с многопроцессорной обработкой

У меня есть задача CPU Celery. Я хотел бы использовать всю вычислительную мощность (ядра) через множество экземпляров EC2, чтобы быстрее выполнить эту работу быстрее (параллельная распределенная задача с сельдерием с многопроцессорной обработкой - я думаю).

Термины потоковая, многопроцессорная, распределенные вычисления, распределенная параллельная обработка - это все термины, которые я пытаясь понять лучше.

Пример задачи:

  @app.task
  for item in list_of_millions_of_ids:
      id = item # do some long complicated equation here very CPU heavy!!!!!!! 
      database.objects(newid=id).save()

Используя код выше (с примером, если это возможно), как раньше было бы распределять эту задачу с помощью Celery, позволяя разделить эту одну задачу, используя всю вычислительную мощность процессора на всей доступной машине в облаке?

4b9b3361

Ответ 1

Ваши цели:

  • Распространяйте свою работу на многие машины (распределенные вычислительная/распределенная параллельная обработка)
  • Распределите работу на данной машине по всем ЦП (Многопроцессорная/резьб)

Сельдерей может сделать это для вас довольно легко. Первое, что нужно понять, это то, что каждый работник сельдерея настроен по умолчанию для запуска как можно большего числа задач, так как в системе доступны ядра процессора:

Concurrency - количество обработчиков, выполняемых для обработки ваши задачи одновременно, когда все они заняты выполнением новой работы задачам придется дождаться завершения одной из задач, прежде чем она сможет обрабатываться.

Номер по умолчанию concurrency - это количество процессоров на этом компьютере (включая ядра), вы можете указать собственный номер с помощью опции -c. Нет рекомендуемого значения, так как оптимальное число зависит от количество факторов, но если ваши задачи в основном связаны с I/O-привязкой, вы можете попытайтесь его увеличить, эксперименты показали, что добавление больше, чем в два раза больше ЦП редко бывает эффективным и может ухудшить вместо этого.

Это означает, что для каждой отдельной задачи не нужно беспокоиться об использовании многопроцессорности/потоковой передачи, чтобы использовать несколько процессоров/ядер. Вместо этого сельдерей будет выполнять достаточно задач одновременно, чтобы использовать каждый доступный процессор.

С этой целью следующий шаг - создать задачу, обрабатывающую некоторую подмножество вашего list_of_millions_of_ids. Здесь у вас есть пара вариантов: каждый должен обрабатывать один идентификатор, поэтому вы запускаете N задач, где N == len(list_of_millions_of_ids). Это гарантирует, что работа будет распределена равномерно между всеми вашими задачами, поскольку никогда не будет случая, когда один рабочий заканчивает рано и просто ждет; если он нуждается в работе, он может вывести идентификатор из очереди. Вы можете сделать это (как упоминалось Джоном Доу), используя сельдерей group.

tasks.py:

@app.task
def process_id(item):
    id = item #long complicated equation here
    database.objects(newid=id).save()

И для выполнения задач:

from celery import group
from tasks import process_id

jobs = group(process_id.s(item) for item in list_of_millions_of_ids)
result = jobs.apply_async()

Другой вариант - разбить список на более мелкие куски и распределить части своим работникам. Этот подход рискует потратить несколько циклов, потому что вы можете столкнуться с некоторыми работниками, ожидающими, пока другие продолжают работать. Тем не менее,

Ответ 2

Почему бы не использовать задачу group celery для этого?

http://celery.readthedocs.org/en/latest/userguide/canvas.html#groups

В принципе, вы должны разделить ids на куски (или диапазоны) и дать им кучу задач в group.

Для более сложных, таких как агрегирование результатов конкретных задач сельдерея, я успешно использовал задачу chord для аналогичной цели:

http://celery.readthedocs.org/en/latest/userguide/canvas.html#chords

Увеличьте settings.CELERYD_CONCURRENCY до разумного количества, которое вы можете себе позволить, тогда работники сельдерея будут продолжать выполнять ваши задания в группе или аккорде до тех пор, пока не будут выполнены.

Примечание: из-за ошибки в kombu возникли проблемы с повторным использованием работников для большого количества задач в прошлом, я не знаю, исправлено ли это сейчас. Возможно, это так, но если нет, уменьшите CELERYD_MAX_TASKS_PER_CHILD.

Пример, основанный на упрощенном и модифицированном коде, который я запускаю:

@app.task
def do_matches():
    match_data = ...
    result = chord(single_batch_processor.s(m) for m in match_data)(summarize.s())

summarize получает результаты всех задач single_batch_processor. Каждая задача выполняется на любом работнике сельдерея, kombu определяет координаты.

Теперь я получаю: single_batch_processor и summarize ТАКЖЕ должны быть задачи сельдерея, а не обычные функции, иначе, конечно, он не будет распараллелен (я даже не уверен, что конструктор аккордов примет его, если он не будет сельдерея).

Ответ 3

В мире распространения есть только одна вещь, которую вы должны помнить прежде всего:

Преждевременная оптимизация - это корень всего зла. D. Knuth

Я знаю, что это кажется очевидным, но перед распределением двойной проверки вы используете лучший алгоритм (если он существует...). Сказав это, оптимизация распределения является балансирующим действием между тремя вещами:

  • Запись/чтение данных с постоянной среды,
  • Перемещение данных из среды A в среду B,
  • Обработка данных,

Компьютеры сделаны так, что чем ближе вы доберетесь до своего процессора (3), тем быстрее и эффективнее (1) и (2). Заказ в классическом кластере будет: сетевой жесткий диск, локальный жесткий диск, оперативная память, внутренняя часть обрабатывающего блока... В настоящее время процессоры становятся достаточно сложными, чтобы считаться ансамблем независимых аппаратных процессоров, обычно называемых ядрами, эти процессы обрабатывают данные (3) через потоки (2). Представьте, что ваше ядро ​​настолько быстрое, что при отправке данных одним потоком вы используете 50% мощности компьютера, если в ядре есть 2 потока, вы будете использовать 100%. Два потока на ядро ​​называются гиперпотоками, а ваша ОС будет видеть 2 процессора на гиперячеевое ядро.

Управление потоками в процессоре обычно называется многопоточным. Управление процессорами из ОС обычно называется многопроцессорной. Управление параллельными задачами в кластере обычно называется параллельным программированием. Управление зависимыми задачами в кластере обычно называется распределенным программированием.

Итак, где ваше узкое место?

  • В (1): попытайтесь сохранить и передать с верхнего уровня (ближе к вашему процессору, например, если сетевой жесткий диск медленнее сначала сохраняет на локальном жестком диске)
  • В (2): Это наиболее распространенный вопрос, постарайтесь избежать пакетов связи, которые не нужны для распространения или сжатия пакетов "на лету" (например, если HD медленный, сохраните только сообщение "пакетное вычисление" и сохранить промежуточные результаты в ОЗУ).
  • В (3): Вы закончили! Вы используете всю вычислительную мощность в своем распоряжении.

Как насчет сельдерея?

Сельдерей - это среда обмена сообщениями для распределенного программирования, которая будет использовать брокерский модуль для связи (2) и бэкэнд-модуль для сохранения (1), это означает, что вы сможете изменить конфигурацию, чтобы избежать большинства узких мест (если возможно) в вашей сети и только в вашей сети. Сначала создайте свой код для достижения наилучшей производительности на одном компьютере. Затем используйте сельдерей в своем кластере с конфигурацией по умолчанию и установите CELERY_RESULT_PERSISTENT=True:

from celery import Celery

app = Celery('tasks', 
             broker='amqp://[email protected]//',
             backend='redis://localhost')

@app.task
def process_id(all_the_data_parameters_needed_to_process_in_this_computer):
    #code that does stuff
    return result

Во время выполнения откройте свои любимые инструменты мониторинга, я использую по умолчанию для rabbitMQ и цветок для сельдерея и топ для процессора, ваши результаты будут сохранены в вашем бэкэнде. Примером узкого места в сети является то, что очереди задач растут настолько сильно, что они задерживают выполнение, вы можете перейти к изменению модулей или конфигурации сельдерея, если не ваше узкое место где-то еще.

Ответ 4

Добавление большего числа работников сельдерея, безусловно, ускорит выполнение этой задачи. Возможно, у вас может быть еще одно узкое место: база данных. Убедитесь, что он может обрабатывать одновременные вставки/обновления.

Относительно вашего вопроса: вы добавляете работников сельдерея, назначая другой процесс в своих экземплярах EC2 как celeryd. В зависимости от того, сколько рабочих вам нужно, вы можете добавить еще больше экземпляров.