Подтвердить что ты не робот

запуск нескольких системных команд в Python

Я пишу простой script, который выполняет системную команду в последовательности файлов. Чтобы ускорить работу, я хотел бы запускать их параллельно, но не сразу - мне нужно контролировать максимальное количество одновременно запущенных команд. Каким будет самый простой способ приблизиться к этому?

4b9b3361

Ответ 1

Если вы все равно вызываете подпроцессы, я не вижу необходимости использовать пул потоков. Основная реализация с использованием модуля subprocess была бы

import subprocess
import os
import time

files = <list of file names>
command = "/bin/touch"
processes = set()
max_processes = 5

for name in files:
    processes.add(subprocess.Popen([command, name]))
    if len(processes) >= max_processes:
        os.wait()
        processes.difference_update([
            p for p in processes if p.poll() is not None])

В Windows os.wait() недоступен (ни какой другой метод ожидания завершения любого дочернего процесса). Вы можете обойти это путем опроса через определенные промежутки времени:

for name in files:
    processes.add(subprocess.Popen([command, name]))
    while len(processes) >= max_processes:
        time.sleep(.1)
        processes.difference_update([
            p for p in processes if p.poll() is not None])

Время ожидания зависит от ожидаемого времени выполнения подпроцессов.

Ответ 2

Ответ от Свена Марнаха почти прав, но есть проблема. Если один из последних процессов max_processes заканчивается, основная программа попытается запустить другой процесс, и цикл for завершится. Это закроет основной процесс, который может, в свою очередь, закрыть дочерние процессы. Для меня это поведение произошло с командой экрана.

Код в Linux будет таким (и будет работать только на python2.7):

import subprocess
import os
import time

files = <list of file names>
command = "/bin/touch"
processes = set()
max_processes = 5

for name in files:
    processes.add(subprocess.Popen([command, name]))
    if len(processes) >= max_processes:
        os.wait()
        processes.difference_update(
            [p for p in processes if p.poll() is not None])
#Check if all the child processes were closed
for p in processes:
    if p.poll() is None:
        p.wait()

Ответ 3

Вам нужно объединить Semaphore объект с threads. Семафор - это объект, который позволяет ограничить количество потоков, выполняемых в данном разделе кода. В этом случае мы будем использовать семафор, чтобы ограничить количество потоков, которые могут запускать вызов os.system.

Сначала мы импортируем необходимые нам модули:

#!/usr/bin/python

import threading
import os

Затем мы создаем объект Семафор. Число четыре здесь - количество потоков, которые могут получить семафор за один раз. Это ограничивает количество подпроцессов, которые могут быть запущены сразу.

semaphore = threading.Semaphore(4)

Эта функция просто переносит вызов на подпроцесс при вызовах семафора.

def run_command(cmd):
    semaphore.acquire()
    try:
        os.system(cmd)
    finally:
        semaphore.release()

Если вы используете Python 2.6+, это может стать еще проще, поскольку вы можете использовать оператор 'with' для выполнения вызовов на покупку и выпуск.

def run_command(cmd):
    with semaphore:
        os.system(cmd)

Наконец, чтобы показать, что это работает как ожидалось, мы будем называть команду "sleep 10" восемь раз.

for i in range(8):
    threading.Thread(target=run_command, args=("sleep 10", )).start()

Запуск script с использованием программы "время" показывает, что она занимает всего 20 секунд, так как параллельно выполняются две партии из четырех спящих.

[email protected]:~/personal/stackoverflow$ time python 4992400.py 

real    0m20.032s                                                                                                                                                                   
user    0m0.020s                                                                                                                                                                    
sys     0m0.008s 

Ответ 4

Я объединил решения Sven и Thuener с тем, что ждет завершающих процессов, а также останавливается, если один из процессов выйдет из строя:

def removeFinishedProcesses(processes):
    """ given a list of (commandString, process), 
        remove those that have completed and return the result 
    """
    newProcs = []
    for pollCmd, pollProc in processes:
        retCode = pollProc.poll()
        if retCode==None:
            # still running
            newProcs.append((pollCmd, pollProc))
        elif retCode!=0:
            # failed
            raise Exception("Command %s failed" % pollCmd)
        else:
            logging.info("Command %s completed successfully" % pollCmd)
    return newProcs

def runCommands(commands, maxCpu):
            processes = []
            for command in commands:
                logging.info("Starting process %s" % command)
                proc =  subprocess.Popen(shlex.split(command))
                procTuple = (command, proc)
                processes.append(procTuple)
                while len(processes) >= maxCpu:
                    time.sleep(.2)
                    processes = removeFinishedProcesses(processes)

            # wait for all processes
            while len(processes)>0:
                time.sleep(0.5)
                processes = removeFinishedProcesses(processes)
            logging.info("All processes completed")

Ответ 5

То, что вы просите, это пул потоков. Существует фиксированное количество потоков, которые могут использоваться для выполнения задач. Когда не выполняется задание, он ждет очереди задач, чтобы получить новую часть кода для выполнения.

Существует этот модуль пула потоков, но есть комментарий, в котором говорится, что он еще не считается завершенным. Там могут быть другие пакеты, но это был первый, который я нашел.

Ответ 6

Если ваши команды операционной системы вы можете просто создать экземпляры процессов с модулем подпроцесса, вызовите их по своему усмотрению. Там не должно быть необходимости нить (ее непитонический), и многопроцессор кажется слишком сложным для этой задачи.

Ответ 7

Этот ответ очень похож на другие ответы, представленные здесь, но он использует список вместо наборов. По какой-то причине при использовании этих ответов я получал ошибку времени выполнения относительно изменения размера набора.

from subprocess import PIPE
import subprocess
import time


def submit_job_max_len(job_list, max_processes):
  sleep_time = 0.1
  processes = list()
  for command in job_list:
    print 'running {n} processes. Submitting {proc}.'.format(n=len(processes),
        proc=str(command))
    processes.append(subprocess.Popen(command, shell=False, stdout=None,
      stdin=PIPE))
    while len(processes) >= max_processes:
      time.sleep(sleep_time)
      processes = [proc for proc in processes if proc.poll() is None]
  while len(processes) > 0:
    time.sleep(sleep_time)
    processes = [proc for proc in processes if proc.poll() is None]


cmd = '/bin/bash run_what.sh {n}'
job_list = ((cmd.format(n=i)).split() for i in range(100))
submit_job_max_len(job_list, max_processes=50)