Подтвердить что ты не робот

Использование мультипроцессинга. Процесс с максимальным количеством одновременных процессов

У меня есть код Python:

from multiprocessing import Process

def f(name):
    print 'hello', name

if __name__ == '__main__':
    for i in range(0, MAX_PROCESSES):
        p = Process(target=f, args=(i,))
        p.start()

который работает хорошо. Однако MAX_PROCESSES является переменной и может принимать любое значение от 1 до 512. Поскольку я запускаю этот код только на машине с 8 ядрами, мне нужно выяснить, возможно ли ограничить число процессов, разрешенных для одновременной работы. Я изучил multiprocessing.Queue, но он не похож на то, что мне нужно, или, возможно, я неправильно интерпретирую документы.

Есть ли способ ограничить количество одновременных multiprocessing.Process процессов. multiprocessing.Process запущен?

4b9b3361

Ответ 1

Было бы разумнее использовать multiprocessing.Pool, который создает пул рабочих процессов на основе максимального количества ядер, доступных в вашей системе, а затем в основном загружает задачи, когда ядра становятся доступными.

Пример из стандартных документов (http://docs.python.org/2/library/multiprocessing.html#using-a-pool-of-workers) показывает, что вы также можете вручную установить количество ядер:

from multiprocessing import Pool

def f(x):
    return x*x

if __name__ == '__main__':
    pool = Pool(processes=4)              # start 4 worker processes
    result = pool.apply_async(f, [10])    # evaluate "f(10)" asynchronously
    print result.get(timeout=1)           # prints "100" unless your computer is *very* slow
    print pool.map(f, range(10))          # prints "[0, 1, 4,..., 81]"

И также полезно знать, что существует метод multiprocessing.cpu_count() для подсчета количества ядер в данной системе, если это необходимо в вашем коде.

Изменить: вот какой код кода, который, кажется, работает для вашего конкретного случая:

import multiprocessing

def f(name):
    print 'hello', name

if __name__ == '__main__':
    pool = multiprocessing.Pool() #use all available cores, otherwise specify the number you want as an argument
    for i in xrange(0, 512):
        pool.apply_async(f, args=(i,))
    pool.close()
    pool.join()

Ответ 2

в целом, это также может выглядеть так:

import multiprocessing
def chunks(l, n):
    for i in range(0, len(l), n):
        yield l[i:i + n]

numberOfThreads = 4


if __name__ == '__main__':
    jobs = []
    for i, param in enumerate(params):
        p = multiprocessing.Process(target=f, args=(i,param))
        jobs.append(p)
    for i in chunks(jobs,numberOfThreads):
        for j in i:
            j.start()
        for j in i:
            j.join()

Конечно, этот способ довольно жесток (так как он ожидает каждого процесса в нежелательной, пока не продолжится со следующей частью). Тем не менее, он работает хорошо примерно для одинакового времени выполнения вызовов функций.

Ответ 3

Я думаю, что семафор - это то, что вы ищете, пример кода:

from multiprocessing import Semaphore

def f(name, sema):
    print 'hello', name
    sema.release()

if __name__ == '__main__':
    concurrency = 20
    total_task_num = 1000
    sema = Semaphore(concurrency)
    all_processes = []
    for i in range(total_task_num):
        sema.acquire()
        p = Process(target=f, args=(i, sema))
        all_processes.append(p)
        p.start()

    # inside main process, wait for all processes to finish
    for p in all_processes:
        p.join()

Другой способ, который может сделать код более структурированным, но потреблять слишком много ресурсов, если total_task_num очень большой, заключается в следующем:

from multiprocessing import Semaphore

def f(name, sema):
    sema.acquire()
    print 'hello', name
    sema.release()

if __name__ == '__main__':
    concurrency = 20
    total_task_num = 1000
    sema = Semaphore(concurrency)
    all_processes = []
    for i in range(total_task_num):
        p = Process(target=f, args=(i, sema))
        all_processes.append(p)
        p.start()

    # inside main process, wait for all processes to finish
    for p in all_processes:
        p.join()