Подтвердить что ты не робот

Многопроцессорность Python Pool.map вызывает вызов?

У меня есть numpy.array из 640x480 изображений, каждый из которых имеет длину 630 изображений. Таким образом, общий массив составляет 630x480x640. Я хочу создать среднее изображение, а также вычислить стандартное отклонение для каждый пиксель на всех 630 изображениях.

Это легко выполнить с помощью

avg_image = numpy.mean(img_array, axis=0)
std_image = numpy.std(img_array, axis=0)

Однако, поскольку я запускаю это для 50 или около того таких массивов и 8 ядро ​​/16 нитей рабочей станции, я решил, что я буду жадно и распараллелить вещи с multiprocessing.Pool.

Итак, я сделал следующее:

def chunk_avg_map(chunk):
    #do the processing
    sig_avg = numpy.mean(chunk, axis=0)
    sig_std = numpy.std(chunk, axis=0)
    return([sig_avg, sig_std])

def chunk_avg(img_data):

    #take each row of the image
    chunks = [img_data[:,i,:] for i in range(len(img_data[0]))]

    pool = multiprocessing.Pool()
    result = pool.map(chunk_avg_map, chunks)
    pool.close()
    pool.join()
    return result

Однако я видел только небольшое ускорение. Поместив инструкции print в chunk_avg_map, я смог определить, что за один раз запускается только один или два процесса, а не 16 (как и следовало ожидать).

Затем я запускал свой код через cProfile в iPython:

%prun current_image_anal.main()

В результате было указано, что наибольшее время было потрачено на звонки по приобретению:

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
     1527  309.755    0.203  309.755    0.203 {built-in method acquire}

Я понимаю, что это что-то делать с блокировкой, но я не понимаю, почему мой код будет делать это. У кого-нибудь есть идеи?

[ИЗМЕНИТЬ] В соответствии с запросом здесь приведен пример работы script, который демонстрирует проблему. Вы можете профилировать его любыми средствами, которые вам нравятся, но когда я это сделал, я обнаружил, что львы доля времени была занята призывами к приобретению, а не средним или std, как я бы ожидали.

#!/usr/bin/python
import numpy
import multiprocessing

def main():
    fake_images = numpy.random.randint(0,2**14,(630,480,640))
    chunk_avg(fake_images)

def chunk_avg_map(chunk):
    #do the processing
    sig_avg = numpy.mean(chunk, axis=0)
    sig_std = numpy.std(chunk, axis=0)
    return([sig_avg, sig_std])

def chunk_avg(img_data):

    #take each row of the image
    chunks = [img_data[:,i,:] for i in range(len(img_data[0]))]

    pool = multiprocessing.Pool()
    result = pool.map(chunk_avg_map, chunks)
    pool.close()
    pool.join()
    return result

if __name__ == "__main__":
    main()
4b9b3361

Ответ 1

Я считаю, что проблема в том, что количество процессорного времени, которое требуется для обработки каждого блока, невелико по сравнению с количеством времени, которое требуется для копирования ввода и вывода на рабочие процессы и из них. Я изменил ваш примерный код, чтобы разделить вывод на 16 четных фрагментов и распечатать разницу в времени процессора (time.clock()) между тем, когда начинается и заканчивается прогон chunk_avg_map(). В моей системе каждый отдельный запуск занимал чуть меньше секунды времени процессора, но общее время использования ЦП для группы процессов (система + время пользователя) составляло более 38 секунд. Очевидная 0,75-секундная накладная копия на каждый фрагмент оставляет вашу программу, выполняющую вычисления, лишь немного быстрее, чем multiprocessing может доставлять данные, что приводит к тому, что сразу два рабочих процесса будут использоваться сразу.

Если я модифицирую код таким образом, чтобы "входные данные" были просто xrange(16) и построили случайный массив в chunk_avg_map(), то я вижу, что время ожидания пользователя sysem + составляет около 19 секунд, а все 16 рабочих процессов, выполняющихся на то же время.