Подтвердить что ты не робот

Многопроцессорность с большими данными

Я использую multiprocessing.Pool() для распараллеливания некоторых тяжелых вычислений.

Целевая функция возвращает много данных (огромный список). У меня закончилась оперативная память.

Без multiprocessing, я просто изменил бы целевую функцию в генератор, yield получая результирующие элементы один за другим, когда они будут вычислены.

Я понимаю, что многопроцессорность не поддерживает генераторы - она ​​ждет весь вывод и возвращает его сразу, верно? Без поддавки. Есть ли способ заставить рабочих Pool получить данные, как только они станут доступны, без построения всего массива результатов в ОЗУ?

Простой пример:

def target_fnc(arg):
   result = []
   for i in xrange(1000000):
       result.append('dvsdbdfbngd') # <== would like to just use yield!
   return result

def process_args(some_args):
    pool = Pool(16)
    for result in pool.imap_unordered(target_fnc, some_args):
        for element in result:
            yield element

Это Python 2.7.

4b9b3361

Ответ 1

Это звучит как идеальный вариант использования для очереди: http://docs.python.org/2/library/multiprocessing.html#exchanging-objects-between-processes

Просто подайте свои результаты в очередь от объединенных работников и проглотите их в главном.

Обратите внимание, что вы все равно можете столкнуться с проблемами давления в памяти, если вы не сбрасываете очередь почти так же быстро, как рабочие ее заполняют. Вы можете ограничить размер очереди (максимальное количество объектов, которые будут вписываться в очередь), и в этом случае объединенные рабочие блокируют в операторах queue.put, пока в очереди не будет доступно пространство. Это поставит потолок на использование памяти. Но если вы это сделаете, возможно, пришло время пересмотреть вопрос о том, требуется ли вообще объединение и/или если имеет смысл использовать меньше работников.

Ответ 2

Если ваши задачи могут возвращать данные в кусках... могут ли они быть разбиты на более мелкие задачи, каждый из которых возвращает один кусок? Очевидно, что это не всегда возможно. Когда это не так, вам нужно использовать какой-то другой механизм (например, Queue, как предлагает Лорен Абрамс). Но когда это так, это, вероятно, лучшее решение по другим причинам, а также решение этой проблемы.

В вашем примере это, безусловно, выполнимо. Например:

def target_fnc(arg, low, high):
   result = []
   for i in xrange(low, high):
       result.append('dvsdbdfbngd') # <== would like to just use yield!
   return result

def process_args(some_args):
    pool = Pool(16)
    pool_args = []
    for low in in range(0, 1000000, 10000):
        pool_args.extend(args + [low, low+10000] for args in some_args)
    for result in pool.imap_unordered(target_fnc, pool_args):
        for element in result:
            yield element

(Вы могли бы, конечно, заменить петлю вложенным пониманием или zip и flatten, если хотите)

Итак, если some_args - [1, 2, 3], вы получите 300 задач - [[1, 0, 10000], [2, 0, 10000], [3, 0, 10000], [1, 10000, 20000], …], каждый из которых возвращает 10000 элементов вместо 1000000.

Ответ 3

Из вашего описания это звучит так, будто вы не так сильно заинтересованы в обработке данных по мере их поступления, как в том, чтобы не пропускать миллионный элемент list назад.

Там более простой способ: просто поместите данные в файл. Например:

def target_fnc(arg):
    fd, path = tempfile.mkstemp(text=True)
    with os.fdopen(fd) as f:
        for i in xrange(1000000):
            f.write('dvsdbdfbngd\n')
    return path

def process_args(some_args):
    pool = Pool(16)
    for result in pool.imap_unordered(target_fnc, some_args):
        with open(result) as f:
            for element in f:
                yield element

Очевидно, что если ваши результаты могут содержать символы новой строки или не являются строками и т.д., вы должны использовать файл csv, numpy и т.д. вместо простого текстового файла, но идея то же самое.

При этом, даже если это проще, обычно бывает полезно обрабатывать данные куском за раз, поэтому разбиение ваших задач или использование Queue (как показывают два других ответа) могут быть лучше, если недостатки (соответственно, необходимость разбить задачи вверх или иметь возможность потреблять данные так быстро, как они созданы) не являются разрывами сделки.