Подтвердить что ты не робот

Многопроцессорность и память Python

Я использую multiprocessing.imap_unordered для выполнения вычисления в списке значений:

def process_parallel(fnc, some_list):
    pool = multiprocessing.Pool()
    for result in pool.imap_unordered(fnc, some_list):
        for x in result:
            yield x
    pool.terminate()

Каждый вызов fnc возвращает объект HUGE в результате, по дизайну. Я могу хранить N экземпляров такого объекта в ОЗУ, где N ~ cpu_count, но не намного больше (не сотни).

Теперь использование этой функции занимает слишком много памяти. Память полностью расходуется в основном процессе, а не на рабочих.

Как imap_unordered сохранить готовые результаты? Я имею в виду результаты, которые уже были возвращены работниками, но еще не переданы пользователю. Я думал, что он умный и только вычислил их "лениво" по мере необходимости, но, видимо, нет.

Похоже, что, поскольку я не могу достаточно быстро использовать результаты process_parallel, пул продолжает массово размещать эти огромные объекты из fnc где-то, внутри, а затем взрывается. Есть ли способ избежать этого? Ограничить внутреннюю очередь?


Я использую Python2.7. Приветствия.

4b9b3361

Ответ 1

Как вы можете видеть, просмотрев соответствующий исходный файл (python2.7/multiprocessing/pool.py), IMapUnorderedIterator использует экземпляр collections.deque для хранения результатов. Если появился новый элемент, он добавляется и удаляется на итерации.

Как вы предположили, если другой огромный объект приходит, когда основной поток все еще обрабатывает объект, они также будут сохранены в памяти.

Что вы можете попробовать, это примерно так:

it = pool.imap_unordered(fnc, some_list)
for result in it:
    it._cond.acquire()
    for x in result:
        yield x
    it._cond.release()

Это должно привести к тому, что поток результатов задачи-результата-получателя будет заблокирован во время обработки элемента, если он пытается поместить следующий объект в deque. Таким образом, не должно быть более двух огромных объектов в памяти. Если это работает для вашего дела, я не знаю;)

Ответ 2

Самое простое решение, которое я могу придумать, - добавить замыкание, чтобы обернуть вашу функцию fnc, которая будет использовать семафор для управления общим количеством одновременных заданий, которые могут выполняться за один раз (я предполагаю, что основной процесс/поток будет увеличивать семафор). Значение семафора может быть рассчитано на основе размера задания и доступной памяти.