Python: параллельный запуск подпроцесса - программирование
Подтвердить что ты не робот

Python: параллельный запуск подпроцесса

У меня есть следующий код, который записывает md5sums в файл журнала

for file in files_output:
    p=subprocess.Popen(['md5sum',file],stdout=logfile)
p.wait()
  • Будут ли они написаны параллельно? т.е. если md5sum занимает много времени для одного из файлов, будет ли еще один запуск до того, как будет завершен предыдущий?

  • Если ответ на вышеизложенное да, могу ли я предположить, что порядок md5sums, записанный в лог файл, может отличаться в зависимости от того, как долго md5sum берет для каждого файла? (некоторые файлы могут быть огромными, некоторые небольшие)

4b9b3361

Ответ 1

Все подпроцессы выполняются параллельно. (Чтобы избежать этого, нужно ждать явно для их завершения.) Они даже могут записывать в файл журнала одновременно, тем самым искажая вывод. Чтобы этого избежать, вы должны позволить каждому процессу записывать в другой файл журнала и собирать все выходы, когда все процессы завершены.

q = Queue.Queue()
result = {}  # used to store the results
for fileName in fileNames:
  q.put(fileName)

def worker():
  while True:
    fileName = q.get()
    if fileName is None:  # EOF?
      return
    subprocess_stuff_using(fileName)
    wait_for_finishing_subprocess()
    checksum = collect_md5_result_for(fileName)
    result[fileName] = checksum  # store it

threads = [ threading.Thread(target=worker) for _i in range(20) ]
for thread in threads:
  thread.start()
  q.put(None)  # one EOF marker for each thread

После этого результаты должны быть сохранены в result.

Ответ 2

  • Да, эти процессы md5sum будут запущены параллельно.
  • Да, порядок записи md5sums будет непредсказуемым. И, как правило, считается, что плохая практика заключается в совместном использовании одного ресурса, такого как файл, из многих процессов.

Также ваш способ сделать p.wait() после цикла for будет ждать завершения последнего процесса md5sum, а остальные из них все еще могут работать.

Но вы можете немного изменить этот код, чтобы иметь преимущества параллельной обработки и предсказуемости синхронизированного вывода, если вы собираете вывод md5sum во временные файлы и собираете его обратно в один файл после завершения всех процессов.

import subprocess
import os

processes = []
for file in files_output:
    f = os.tmpfile()
    p = subprocess.Popen(['md5sum',file],stdout=f)
    processes.append((p, f))

for p, f in processes:
    p.wait()
    f.seek(0)
    logfile.write(f.read())
    f.close()

Ответ 3

Простым способом сбора данных из параллельных подпроцессов md5sum является использование пула потоков и запись в файл из основного процесса:

from multiprocessing.dummy import Pool # use threads
from subprocess import check_output

def md5sum(filename):
    try:
        return check_output(["md5sum", filename]), None
    except Exception as e:
        return None, e

if __name__ == "__main__":
    p = Pool(number_of_processes) # specify number of concurrent processes
    with open("md5sums.txt", "wb") as logfile:
        for output, error in p.imap(md5sum, filenames): # provide filenames
            if error is None:
               logfile.write(output)
  • вывод из md5sum невелик, поэтому вы можете сохранить его в памяти
  • imap сохраняет порядок
  • number_of_processes может отличаться от количества файлов или ядер процессора (большие значения не означают быстрее: это зависит от относительной производительности IO (дисков) и CPU)

Вы можете попробовать передать несколько файлов сразу в подпроцессы md5sum.

В этом случае вам не нужен внешний подпроцесс; вы можете вычислить md5 в Python:

import hashlib
from functools import partial

def md5sum(filename, chunksize=2**15, bufsize=-1):
    m = hashlib.md5()
    with open(filename, 'rb', bufsize) as f:
        for chunk in iter(partial(f.read, chunksize), b''):
            m.update(chunk)
    return m.hexdigest()

Чтобы использовать несколько процессов вместо потоков (чтобы простой Python md5sum() выполнялся параллельно с использованием нескольких процессоров), просто удалите .dummy из импорта в указанном выше коде.