Подтвердить что ты не робот

Многопроцессорство в Python при ограничении количества запущенных процессов

Я хотел бы одновременно запускать несколько экземпляров program.py, одновременно ограничивая количество экземпляров, запущенных одновременно (например, количество ядер ЦП, доступных в моей системе). Например, если у меня есть 10 ядер и вам нужно выполнить 1000 запусков program.py, всего будет создано и запущено всего 10 экземпляров в любой момент времени.

Я пробовал использовать модуль многопроцессорности, многопоточность и использование очередей, но мне не показалось, что мне легко поддаться легкой реализации. Самая большая проблема, с которой я сталкиваюсь, - найти способ ограничить количество запущенных процессов одновременно. Это важно, потому что, если я создам 1000 процессов одновременно, это становится эквивалентным вилкой. Мне не нужны результаты, возвращенные из процессов программным способом (они выводятся на диск), и все процессы выполняются независимо друг от друга.

Может кто-нибудь, пожалуйста, дайте мне предложения или пример того, как я мог реализовать это в python или даже bash? Я бы опубликовал код, который я написал до сих пор, используя очереди, но он не работает так, как предполагалось, и может быть уже неверным путем.

Большое спасибо.

4b9b3361

Ответ 1

Я знаю, что вы упомянули, что подход Pool.map не имеет для вас никакого смысла. Карта - это просто простой способ дать ему источник работы и возможность обращения к каждому из элементов. func для карты может быть любой точкой входа, чтобы выполнить фактическую работу над данным аргументом.

Если вам это не кажется правильным, у меня есть довольно подробный ответ об использовании шаблона Producer-Consumer: fooobar.com/questions/106290/...

По существу, вы создаете очередь и начинаете N число рабочих. Затем вы либо загружаете очередь из основного потока, либо создаете процесс Producer, который передает очередь. Работники просто продолжают работать из очереди, и никогда не будет более параллельной работы, чем количество запущенных вами процессов.

У вас также есть возможность установить лимит на очередь, чтобы он блокировал производителя, когда есть слишком много выдающейся работы, если вам нужно также установить ограничения на скорость и ресурсы, которые потребляет производитель.

Функция работы, вызываемая вызовом, может делать все, что вы хотите. Это может быть оболочка вокруг некоторой системной команды или она может импортировать вашу библиотеку python и запускать основную процедуру. Существуют специальные системы управления процессами, которые позволяют настраивать конфигурации для запуска произвольных исполняемых файлов в ограниченных ресурсах, но это всего лишь базовый подход к выполнению этого подхода на основе python.

Отрывки из этого моего другого ответа:

Основной пул:

from multiprocessing import Pool

def do_work(val):
    # could instantiate some other library class,
    # call out to the file system,
    # or do something simple right here.
    return "FOO: %s" % val

pool = Pool(4)
work = get_work_args()
results = pool.map(do_work, work)

Использование диспетчера процессов и производителя

from multiprocessing import Process, Manager
import time
import itertools

def do_work(in_queue, out_list):
    while True:
        item = in_queue.get()

        # exit signal 
        if item == None:
            return

        # fake work
        time.sleep(.5)
        result = item

        out_list.append(result)


if __name__ == "__main__":
    num_workers = 4

    manager = Manager()
    results = manager.list()
    work = manager.Queue(num_workers)

    # start for workers    
    pool = []
    for i in xrange(num_workers):
        p = Process(target=do_work, args=(work, results))
        p.start()
        pool.append(p)

    # produce data
    # this could also be started in a producer process
    # instead of blocking
    iters = itertools.chain(get_work_args(), (None,)*num_workers)
    for item in iters:
        work.put(item)

    for p in pool:
        p.join()

    print results

Ответ 2

Вы должны использовать диспетчер процессов. Один из подходов будет использовать API, предоставляемый Circus, чтобы сделать это "программно", сайт документации теперь отключен, но я думаю, что его просто во всяком случае, вы можете использовать Цирк, чтобы справиться с этим. Другим подходом будет использование supervisord и установка параметра numprocs процесса на количество ядер, которые у вас есть.

Пример использования Цирка:

from circus import get_arbiter

arbiter = get_arbiter("myprogram", numprocesses=3)
try:
    arbiter.start()
finally:
    arbiter.stop()

Ответ 3

Bash script, а не Python, но я часто использую его для простой параллельной обработки:

#!/usr/bin/env bash
waitForNProcs()
{
 nprocs=$(pgrep -f $procName | wc -l)
 while [ $nprocs -gt $MAXPROCS ]; do
  sleep $SLEEPTIME
  nprocs=$(pgrep -f $procName | wc -l)
 done
}
SLEEPTIME=3
MAXPROCS=10
procName=myPython.py
for file in ./data/*.txt; do
 waitForNProcs
 ./$procName $file &
done

Или для очень простых случаев другой параметр - xargs, где P задает количество procs

find ./data/ | grep txt | xargs -P10 -I SUB ./myPython.py SUB 

Ответ 4

В то время как есть много ответов об использовании multiprocessing.pool, не так много фрагментов кода о том, как использовать многопроцессорность. Процесс, который действительно более полезен при использовании памяти. запуск 1000 процессов приведет к перегрузке процессора и уничтожению памяти. Если каждый процесс и его конвейеры данных интенсивно занимаются памятью, OS или Python будут ограничивать количество параллельных процессов. Я разработал приведенный ниже код, чтобы ограничить одновременное количество заданий, переданных в CPU партиями. Размер партии можно масштабировать пропорционально количеству ядер ЦП. На моем ПК с Windows количество заданий на партию может быть эффективным в 4 раза выше, чем у процессора.

import multiprocessing
def func_to_be_multiprocessed(q):
    q.put('s')
q = multiprocessing.Queue()
worker = []
for p in range(number_of_jobs):
    worker[p].append(multiprocessing.Process(target=func_to_be_multiprocessed, \
        args=(q,data)...))
num_cores = multiprocessing.cpu_count()
Scaling_factor_batch_jobs = 3.0
num_jobs_per_batch = num_cores * Scaling_factor_batch_jobs
num_of_batches = number_of_jobs // num_jobs_per_batch
for i_batch in range(num_of_batches):
    floor_job = i_batch * num_jobs_per_batch
    ceil_job  = floor_job + num_jobs_per_batch
    for p in worker[floor_job : ceil_job]:
                                         worker.start()
    for p in worker[floor_job : ceil_job]:
                                         worker.join()
for p in worker[ceil_job :]:
                           worker.start()
for p in worker[ceil_job :]:
                           worker.join()
for p in multiprocessing.active_children():
                           p.terminate()
result = []
for p in worker:
   result.append(q.get())

Единственная проблема заключается в том, что если какая-либо работа в какой-либо партии не может завершиться и приведет к зависанию, остальные задания будут работать. Таким образом, функция, подлежащая обработке, должна иметь правильные процедуры обработки ошибок.