Контекст
Я использую модуль subprocess
, чтобы начать процесс с python. Я хочу иметь доступ к выходному (stdout, stderr), как только он будет записан/буферизирован.
- Решение должно поддерживать Windows 7. Я также требую решения для систем Unix, но я подозреваю, что решение Windows сложнее решить.
- Решение должно поддерживать Python 2.6. В настоящее время я ограничен Python 2.6, но решения, использующие более поздние версии Python, по-прежнему ценятся.
- Решение не должно использовать сторонние библиотеки. В идеале мне бы понравилось решение с использованием стандартной библиотеки, но я открыт для предложений.
- Решение должно работать практически для любого процесса. Предположим, что нет никакого контроля над выполняемым процессом.
Детский процесс
Например, представьте, что я хочу запустить файл python с именем counter.py
через subprocess
. Содержимое counter.py
выглядит следующим образом:
import sys
for index in range(10):
# Write data to standard out.
sys.stdout.write(str(index))
# Push buffered data to disk.
sys.stdout.flush()
Родительский процесс
Родительский процесс, ответственный за выполнение примера counter.py
, выглядит следующим образом:
import subprocess
command = ['python', 'counter.py']
process = subprocess.Popen(
cmd,
bufsize=1,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
Проблема
Используя пример counter.py
, я могу получить доступ к данным до завершения процесса. Отлично! Это именно то, чего я хочу. Однако удаление вызова sys.stdout.flush()
предотвращает доступ к данным в то время, когда я это хочу. Это плохо! Это именно то, чего я не хочу. Я понимаю, что вызов flush()
заставляет записывать данные на диск и до того, как данные записаны на диск, он существует только в буфере. Помните, что я хочу иметь возможность запускать практически любой процесс. Я не ожидаю, что процесс будет выполнять этот вид промывки, но я все еще ожидаю, что данные будут доступны в режиме реального времени (или рядом с ним). Есть ли способ достичь этого?
Быстрая заметка о родительском процессе. Вы можете заметить, что я использую bufsize=0
для буферизации строк. Я надеялся, что это приведет к сбою на диске для каждой строки, но похоже, что это не так. Как этот аргумент работает?
Вы также заметите, что я использую subprocess.PIPE
. Это связано с тем, что это единственное значение, которое создает объекты ввода-вывода между родительским и дочерним процессами. Я пришел к такому выводу, посмотрев метод Popen._get_handles
в модуле subprocess
(я имею в виду определение Windows здесь). Существуют две важные переменные: c2pread
и c2pwrite
, которые устанавливаются на основе значения stdout
, переданного конструктору Popen
. Например, если stdout
не задано, переменная c2pread
не задана. Это также имеет место при использовании файловых дескрипторов и файловых объектов. Я не знаю, насколько это важно или нет, но мой инстинкт кишки говорит мне, что я хотел бы читать и писать объекты IO для достижения того, чего я пытаюсь достичь, поэтому я выбрал subprocess.PIPE
. Я был бы очень благодарен, если бы кто-нибудь мог объяснить это более подробно. Аналогично, если есть веская причина использовать что-то другое, кроме subprocess.PIPE
, я все уши.
Метод извлечения данных из дочернего процесса
import time
import subprocess
import threading
import Queue
class StreamReader(threading.Thread):
"""
Threaded object used for reading process output stream (stdout, stderr).
"""
def __init__(self, stream, queue, *args, **kwargs):
super(StreamReader, self).__init__(*args, **kwargs)
self._stream = stream
self._queue = queue
# Event used to terminate thread. This way we will have a chance to
# tie up loose ends.
self._stop = threading.Event()
def stop(self):
"""
Stop thread. Call this function to terminate the thread.
"""
self._stop.set()
def stopped(self):
"""
Check whether the thread has been terminated.
"""
return self._stop.isSet()
def run(self):
while True:
# Flush buffered data (not sure this actually works?)
self._stream.flush()
# Read available data.
for line in iter(self._stream.readline, b''):
self._queue.put(line)
# Breather.
time.sleep(0.25)
# Check whether thread has been terminated.
if self.stopped():
break
cmd = ['python', 'counter.py']
process = subprocess.Popen(
cmd,
bufsize=1,
stdout=subprocess.PIPE,
)
stdout_queue = Queue.Queue()
stdout_reader = StreamReader(process.stdout, stdout_queue)
stdout_reader.daemon = True
stdout_reader.start()
# Read standard out of the child process whilst it is active.
while True:
# Attempt to read available data.
try:
line = stdout_queue.get(timeout=0.1)
print '%s' % line
# If data was not read within time out period. Continue.
except Queue.Empty:
# No data currently available.
pass
# Check whether child process is still active.
if process.poll() != None:
# Process is no longer active.
break
# Process is no longer active. Nothing more to read. Stop reader thread.
stdout_reader.stop()
Здесь я выполняю логику, которая читает стандарт из дочернего процесса в потоке. Это позволяет сценарий, в котором чтение блокируется, пока не будут доступны данные. Вместо того, чтобы ждать какой-то потенциально длительный период времени, мы проверяем, есть ли доступные данные, которые нужно читать в течение периода ожидания, и продолжайте цикл, если этого не происходит.
Я также пробовал использовать другой подход, используя нечитающее чтение. Этот подход использует модуль ctypes
для доступа к системным вызовам Windows. Обратите внимание, что я не совсем понимаю, что я здесь делаю - я просто попытался понять пример кода, который я видел в других сообщениях. В любом случае следующий фрагмент не решает проблему буферизации. Я понимаю, что это просто еще один способ борьбы с потенциально длительным временем чтения.
import os
import subprocess
import ctypes
import ctypes.wintypes
import msvcrt
cmd = ['python', 'counter.py']
process = subprocess.Popen(
cmd,
bufsize=1,
stdout=subprocess.PIPE,
)
def read_output_non_blocking(stream):
data = ''
available_bytes = 0
c_read = ctypes.c_ulong()
c_available = ctypes.c_ulong()
c_message = ctypes.c_ulong()
fileno = stream.fileno()
handle = msvcrt.get_osfhandle(fileno)
# Read available data.
buffer_ = None
bytes_ = 0
status = ctypes.windll.kernel32.PeekNamedPipe(
handle,
buffer_,
bytes_,
ctypes.byref(c_read),
ctypes.byref(c_available),
ctypes.byref(c_message),
)
if status:
available_bytes = int(c_available.value)
if available_bytes > 0:
data = os.read(fileno, available_bytes)
print data
return data
while True:
# Read standard out for child process.
stdout = read_output_non_blocking(process.stdout)
print stdout
# Check whether child process is still active.
if process.poll() != None:
# Process is no longer active.
break
Комментарии очень ценятся.
Приветствия