Я пишу простой script, который выполняет системную команду в последовательности файлов. Чтобы ускорить работу, я хотел бы запускать их параллельно, но не сразу - мне нужно контролировать максимальное количество одновременно запущенных команд. Каким будет самый простой способ приблизиться к этому?
запуск нескольких системных команд в Python
Ответ 1
Если вы все равно вызываете подпроцессы, я не вижу необходимости использовать пул потоков. Основная реализация с использованием модуля subprocess
была бы
import subprocess
import os
import time
files = <list of file names>
command = "/bin/touch"
processes = set()
max_processes = 5
for name in files:
processes.add(subprocess.Popen([command, name]))
if len(processes) >= max_processes:
os.wait()
processes.difference_update([
p for p in processes if p.poll() is not None])
В Windows os.wait()
недоступен (ни какой другой метод ожидания завершения любого дочернего процесса). Вы можете обойти это путем опроса через определенные промежутки времени:
for name in files:
processes.add(subprocess.Popen([command, name]))
while len(processes) >= max_processes:
time.sleep(.1)
processes.difference_update([
p for p in processes if p.poll() is not None])
Время ожидания зависит от ожидаемого времени выполнения подпроцессов.
Ответ 2
Ответ от Свена Марнаха почти прав, но есть проблема. Если один из последних процессов max_processes заканчивается, основная программа попытается запустить другой процесс, и цикл for завершится. Это закроет основной процесс, который может, в свою очередь, закрыть дочерние процессы. Для меня это поведение произошло с командой экрана.
Код в Linux будет таким (и будет работать только на python2.7):
import subprocess
import os
import time
files = <list of file names>
command = "/bin/touch"
processes = set()
max_processes = 5
for name in files:
processes.add(subprocess.Popen([command, name]))
if len(processes) >= max_processes:
os.wait()
processes.difference_update(
[p for p in processes if p.poll() is not None])
#Check if all the child processes were closed
for p in processes:
if p.poll() is None:
p.wait()
Ответ 3
Вам нужно объединить Semaphore объект с threads. Семафор - это объект, который позволяет ограничить количество потоков, выполняемых в данном разделе кода. В этом случае мы будем использовать семафор, чтобы ограничить количество потоков, которые могут запускать вызов os.system.
Сначала мы импортируем необходимые нам модули:
#!/usr/bin/python
import threading
import os
Затем мы создаем объект Семафор. Число четыре здесь - количество потоков, которые могут получить семафор за один раз. Это ограничивает количество подпроцессов, которые могут быть запущены сразу.
semaphore = threading.Semaphore(4)
Эта функция просто переносит вызов на подпроцесс при вызовах семафора.
def run_command(cmd):
semaphore.acquire()
try:
os.system(cmd)
finally:
semaphore.release()
Если вы используете Python 2.6+, это может стать еще проще, поскольку вы можете использовать оператор 'with' для выполнения вызовов на покупку и выпуск.
def run_command(cmd):
with semaphore:
os.system(cmd)
Наконец, чтобы показать, что это работает как ожидалось, мы будем называть команду "sleep 10" восемь раз.
for i in range(8):
threading.Thread(target=run_command, args=("sleep 10", )).start()
Запуск script с использованием программы "время" показывает, что она занимает всего 20 секунд, так как параллельно выполняются две партии из четырех спящих.
[email protected]:~/personal/stackoverflow$ time python 4992400.py
real 0m20.032s
user 0m0.020s
sys 0m0.008s
Ответ 4
Я объединил решения Sven и Thuener с тем, что ждет завершающих процессов, а также останавливается, если один из процессов выйдет из строя:
def removeFinishedProcesses(processes):
""" given a list of (commandString, process),
remove those that have completed and return the result
"""
newProcs = []
for pollCmd, pollProc in processes:
retCode = pollProc.poll()
if retCode==None:
# still running
newProcs.append((pollCmd, pollProc))
elif retCode!=0:
# failed
raise Exception("Command %s failed" % pollCmd)
else:
logging.info("Command %s completed successfully" % pollCmd)
return newProcs
def runCommands(commands, maxCpu):
processes = []
for command in commands:
logging.info("Starting process %s" % command)
proc = subprocess.Popen(shlex.split(command))
procTuple = (command, proc)
processes.append(procTuple)
while len(processes) >= maxCpu:
time.sleep(.2)
processes = removeFinishedProcesses(processes)
# wait for all processes
while len(processes)>0:
time.sleep(0.5)
processes = removeFinishedProcesses(processes)
logging.info("All processes completed")
Ответ 5
То, что вы просите, это пул потоков. Существует фиксированное количество потоков, которые могут использоваться для выполнения задач. Когда не выполняется задание, он ждет очереди задач, чтобы получить новую часть кода для выполнения.
Существует этот модуль пула потоков, но есть комментарий, в котором говорится, что он еще не считается завершенным. Там могут быть другие пакеты, но это был первый, который я нашел.
Ответ 6
Если ваши команды операционной системы вы можете просто создать экземпляры процессов с модулем подпроцесса, вызовите их по своему усмотрению. Там не должно быть необходимости нить (ее непитонический), и многопроцессор кажется слишком сложным для этой задачи.
Ответ 7
Этот ответ очень похож на другие ответы, представленные здесь, но он использует список вместо наборов. По какой-то причине при использовании этих ответов я получал ошибку времени выполнения относительно изменения размера набора.
from subprocess import PIPE
import subprocess
import time
def submit_job_max_len(job_list, max_processes):
sleep_time = 0.1
processes = list()
for command in job_list:
print 'running {n} processes. Submitting {proc}.'.format(n=len(processes),
proc=str(command))
processes.append(subprocess.Popen(command, shell=False, stdout=None,
stdin=PIPE))
while len(processes) >= max_processes:
time.sleep(sleep_time)
processes = [proc for proc in processes if proc.poll() is None]
while len(processes) > 0:
time.sleep(sleep_time)
processes = [proc for proc in processes if proc.poll() is None]
cmd = '/bin/bash run_what.sh {n}'
job_list = ((cmd.format(n=i)).split() for i in range(100))
submit_job_max_len(job_list, max_processes=50)