Подтвердить что ты не робот

Использование памяти продолжает расти с Python multiprocessing.pool

Здесь программа:

#!/usr/bin/python

import multiprocessing

def dummy_func(r):
    pass

def worker():
    pass

if __name__ == '__main__':
    pool = multiprocessing.Pool(processes=16)
    for index in range(0,100000):
        pool.apply_async(worker, callback=dummy_func)

    # clean up
    pool.close()
    pool.join()

Я нашел, что использование памяти (как VIRT, так и RES) продолжало расти до закрытия()/join(), есть ли решение, чтобы избавиться от этого? Я пробовал maxtasksperchild с 2,7, но это тоже не помогло.

У меня есть более сложная программа, которая вызывает apply_async() ~ 6M раз, а в точке ~ 1.5M у меня уже есть 6G + RES, чтобы избежать всех других факторов, я упростил программу до версии.

EDIT:

Оказалось, что эта версия работает лучше, спасибо за вход:

#!/usr/bin/python

import multiprocessing

ready_list = []
def dummy_func(index):
    global ready_list
    ready_list.append(index)

def worker(index):
    return index

if __name__ == '__main__':
    pool = multiprocessing.Pool(processes=16)
    result = {}
    for index in range(0,1000000):
        result[index] = (pool.apply_async(worker, (index,), callback=dummy_func))
        for ready in ready_list:
            result[ready].wait()
            del result[ready]
        ready_list = []

    # clean up
    pool.close()
    pool.join()

Я не помещал туда никакой блокировки, так как я считаю, что основной процесс является однопоточным (обратный вызов более или менее похож на событие, управляемое событиями для каждого документа, которое я читаю).

Я изменил диапазон индексов v1 до 1 000 000, то же самое, что и v2, и сделал некоторые тесты - это странно для меня, v2 даже на ~ 10% быстрее, чем v1 (33s против 37s), возможно, v1 занимался слишком многими внутренними заданиями по обслуживанию списка. v2 определенно является победителем в использовании памяти, он никогда не превышал 300M (VIRT) и 50M (RES), в то время как v1 был 370M/120M, лучшим был 330M/85M. Все номера были всего 3 ~ 4 раза тестирования, только ссылки.

4b9b3361

Ответ 1

У меня были проблемы с памятью в последнее время, так как я многократно использовал функцию многопроцессорности, поэтому он продолжает создавать нереста и оставляет их в памяти.

Здесь решение, которое я использую сейчас:

def myParallelProcess(ahugearray)
 from multiprocessing import Pool
 from contextlib import closing
 with closing( Pool(15) ) as p:
    res = p.imap_unordered(simple_matching, ahugearray, 100)
 return res

I < с

Ответ 2

Используйте map_async вместо apply_async, чтобы избежать чрезмерного использования памяти.

В первом примере измените следующие две строки:

for index in range(0,100000):
    pool.apply_async(worker, callback=dummy_func)

to

pool.map_async(worker, range(100000), callback=dummy_func)

Он будет мигать, прежде чем вы сможете увидеть его использование памяти в top. Измените список на более крупный, чтобы увидеть разницу. Но обратите внимание, что map_async сначала преобразует итерабельность, которую вы передаете ему, в список, чтобы рассчитать его длину, если у него нет метода __len__. Если у вас есть итератор огромного количества элементов, вы можете использовать itertools.islice для обработки их меньшими фрагментами.

У меня была проблема с памятью в реальной программе с гораздо большим количеством данных, и, наконец, найден виновник был apply_async.

P.S., в отношении использования памяти, ваши два примера не имеют очевидной разницы.

Ответ 3

У меня есть очень большой набор данных с облаками 3D-точек, которые я обрабатываю. Я попытался использовать многопроцессорный модуль, чтобы ускорить обработку, но я начал получать ошибки из памяти. После некоторых исследований и тестирования я решил, что я заполняю очередь задач, которые нужно обрабатывать намного быстрее, чем подпроцессы могут ее опорочить. Я уверен, что с помощью chunking или с помощью map_async или что-то, что я мог бы настроить нагрузку, но я не хотел вносить существенные изменения в окружающую логику.

Нервное решение, на которое я нахожусь, - это периодичность проверки длины pool._cache, и если кеш слишком велик, подождите, пока очередь не будет пуста.

В моем mainloop у меня уже был счетчик и тикер состояния:

# Update status
count += 1
if count%10000 == 0:
    sys.stdout.write('.')
    if len(pool._cache) > 1e6:
        print "waiting for cache to clear..."
        last.wait() # Where last is assigned the latest ApplyResult

Итак, каждые 10k вставки в пул я проверяю, есть ли более 1 миллиона операций в очереди (около 1 Гб памяти, используемой в основном процессе). Когда очередь заполнена, я просто жду завершения последнего вставленного задания.

Теперь моя программа может работать в течение нескольких часов без нехватки памяти. Основной процесс просто приостанавливается, пока рабочие продолжают обрабатывать данные.

BTW член _cache документирует пример пула модулей мультипроцессора:

#
# Check there are no outstanding tasks
#

assert not pool._cache, 'cache = %r' % pool._cache

Ответ 4

Я думаю, что это похоже на вопрос, который я опубликовал, но я не уверен, что у вас такая же задержка. Моя проблема заключалась в том, что я получал результаты из многопроцессорного пула быстрее, чем я их потреблял, поэтому они создавались в памяти. Чтобы этого избежать, я использовал semaphore, чтобы дросселировать входы в пул, чтобы они не слишком сильно опережали выходы я потреблял.

Ответ 5

Просто создайте пул в своем цикле и закройте его в конце цикла с помощью pool.close().