Подтвердить что ты не робот

Чтение вывода из дочернего процесса с использованием python

Контекст

Я использую модуль subprocess, чтобы начать процесс с python. Я хочу иметь доступ к выходному (stdout, stderr), как только он будет записан/буферизирован.

  • Решение должно поддерживать Windows 7. Я также требую решения для систем Unix, но я подозреваю, что решение Windows сложнее решить.
  • Решение должно поддерживать Python 2.6. В настоящее время я ограничен Python 2.6, но решения, использующие более поздние версии Python, по-прежнему ценятся.
  • Решение не должно использовать сторонние библиотеки. В идеале мне бы понравилось решение с использованием стандартной библиотеки, но я открыт для предложений.
  • Решение должно работать практически для любого процесса. Предположим, что нет никакого контроля над выполняемым процессом.

Детский процесс

Например, представьте, что я хочу запустить файл python с именем counter.py через subprocess. Содержимое counter.py выглядит следующим образом:

import sys

for index in range(10):

    # Write data to standard out.
    sys.stdout.write(str(index))

    # Push buffered data to disk.
    sys.stdout.flush()

Родительский процесс

Родительский процесс, ответственный за выполнение примера counter.py, выглядит следующим образом:

import subprocess

command = ['python', 'counter.py']

process = subprocess.Popen(
    cmd,
    bufsize=1,
    stdout=subprocess.PIPE,
    stderr=subprocess.PIPE,
    ) 

Проблема

Используя пример counter.py, я могу получить доступ к данным до завершения процесса. Отлично! Это именно то, чего я хочу. Однако удаление вызова sys.stdout.flush() предотвращает доступ к данным в то время, когда я это хочу. Это плохо! Это именно то, чего я не хочу. Я понимаю, что вызов flush() заставляет записывать данные на диск и до того, как данные записаны на диск, он существует только в буфере. Помните, что я хочу иметь возможность запускать практически любой процесс. Я не ожидаю, что процесс будет выполнять этот вид промывки, но я все еще ожидаю, что данные будут доступны в режиме реального времени (или рядом с ним). Есть ли способ достичь этого?

Быстрая заметка о родительском процессе. Вы можете заметить, что я использую bufsize=0 для буферизации строк. Я надеялся, что это приведет к сбою на диске для каждой строки, но похоже, что это не так. Как этот аргумент работает?

Вы также заметите, что я использую subprocess.PIPE. Это связано с тем, что это единственное значение, которое создает объекты ввода-вывода между родительским и дочерним процессами. Я пришел к такому выводу, посмотрев метод Popen._get_handles в модуле subprocess (я имею в виду определение Windows здесь). Существуют две важные переменные: c2pread и c2pwrite, которые устанавливаются на основе значения stdout, переданного конструктору Popen. Например, если stdout не задано, переменная c2pread не задана. Это также имеет место при использовании файловых дескрипторов и файловых объектов. Я не знаю, насколько это важно или нет, но мой инстинкт кишки говорит мне, что я хотел бы читать и писать объекты IO для достижения того, чего я пытаюсь достичь, поэтому я выбрал subprocess.PIPE. Я был бы очень благодарен, если бы кто-нибудь мог объяснить это более подробно. Аналогично, если есть веская причина использовать что-то другое, кроме subprocess.PIPE, я все уши.

Метод извлечения данных из дочернего процесса

import time
import subprocess
import threading
import Queue


class StreamReader(threading.Thread):
    """
    Threaded object used for reading process output stream (stdout, stderr).   
    """

    def __init__(self, stream, queue, *args, **kwargs):
        super(StreamReader, self).__init__(*args, **kwargs)
        self._stream = stream
        self._queue = queue

        # Event used to terminate thread. This way we will have a chance to 
        # tie up loose ends. 
        self._stop = threading.Event()

    def stop(self):
        """
        Stop thread. Call this function to terminate the thread. 
        """
        self._stop.set()

    def stopped(self):
        """
        Check whether the thread has been terminated.
        """
        return self._stop.isSet()

    def run(self):
        while True:
            # Flush buffered data (not sure this actually works?)
            self._stream.flush()

            # Read available data.
            for line in iter(self._stream.readline, b''):
                self._queue.put(line)

            # Breather.
            time.sleep(0.25)

            # Check whether thread has been terminated.
            if self.stopped():
                break


cmd = ['python', 'counter.py']

process = subprocess.Popen(
    cmd,
    bufsize=1,
    stdout=subprocess.PIPE,
    )

stdout_queue = Queue.Queue()
stdout_reader = StreamReader(process.stdout, stdout_queue)
stdout_reader.daemon = True
stdout_reader.start()

# Read standard out of the child process whilst it is active.  
while True:

    # Attempt to read available data.  
    try:
        line = stdout_queue.get(timeout=0.1)
        print '%s' % line

    # If data was not read within time out period. Continue. 
    except Queue.Empty:
        # No data currently available.
        pass

    # Check whether child process is still active.
    if process.poll() != None:

        # Process is no longer active.
        break

# Process is no longer active. Nothing more to read. Stop reader thread.
stdout_reader.stop()

Здесь я выполняю логику, которая читает стандарт из дочернего процесса в потоке. Это позволяет сценарий, в котором чтение блокируется, пока не будут доступны данные. Вместо того, чтобы ждать какой-то потенциально длительный период времени, мы проверяем, есть ли доступные данные, которые нужно читать в течение периода ожидания, и продолжайте цикл, если этого не происходит.

Я также пробовал использовать другой подход, используя нечитающее чтение. Этот подход использует модуль ctypes для доступа к системным вызовам Windows. Обратите внимание, что я не совсем понимаю, что я здесь делаю - я просто попытался понять пример кода, который я видел в других сообщениях. В любом случае следующий фрагмент не решает проблему буферизации. Я понимаю, что это просто еще один способ борьбы с потенциально длительным временем чтения.

import os
import subprocess

import ctypes
import ctypes.wintypes
import msvcrt

cmd = ['python', 'counter.py']

process = subprocess.Popen(
    cmd,
    bufsize=1,
    stdout=subprocess.PIPE,
    )


def read_output_non_blocking(stream):
    data = ''
    available_bytes = 0

    c_read = ctypes.c_ulong()
    c_available = ctypes.c_ulong()
    c_message = ctypes.c_ulong()

    fileno = stream.fileno()
    handle = msvcrt.get_osfhandle(fileno)

    # Read available data.
    buffer_ = None
    bytes_ = 0
    status = ctypes.windll.kernel32.PeekNamedPipe(
        handle,
        buffer_,
        bytes_,
        ctypes.byref(c_read),
        ctypes.byref(c_available),
        ctypes.byref(c_message),
        )

    if status:
        available_bytes = int(c_available.value)

    if available_bytes > 0:
        data = os.read(fileno, available_bytes)
        print data

    return data

while True:

    # Read standard out for child process.
    stdout = read_output_non_blocking(process.stdout)
    print stdout

    # Check whether child process is still active.
    if process.poll() != None:

        # Process is no longer active.
        break

Комментарии очень ценятся.

Приветствия

4b9b3361

Ответ 1

Здесь проблема заключается в буферизации дочернего процесса. Ваш код subprocess уже работает так же хорошо, как и он, но если у вас есть дочерний процесс, который буферизует его вывод, то ничего не может сделать об этом subprocess.

Я не могу этого достаточно подчеркнуть: задержки буферизации, которые вы видите, являются ответственностью дочернего процесса, а также то, как он обрабатывает буферизацию, не имеет ничего общего с модулем subprocess.

Вы уже это обнаружили; поэтому добавление sys.stdout.flush() в дочерний процесс приводит к тому, что данные появляются раньше; дочерний процесс использует буферизованный ввод-вывод (кэш памяти для сбора записанных данных) перед отправкой его по каналу sys.stdout 1.

Python автоматически использует буферизацию строк, когда sys.stdout подключен к терминалу; буфер сбрасывается всякий раз, когда записывается новая строка. При использовании труб sys.stdout не подключается к терминалу, а вместо него используется буфер фиксированного размера.

Теперь, детскому процессу Python может быть предложено обрабатывать буферизацию по-разному; вы можете установить переменную окружения или использовать переключатель командной строки, чтобы изменить способ использования буферизации для sys.stdoutsys.stderr и sys.stdin). Из Документация командной строки Python:

-u
Force stdin, stdout и stderr полностью не загружены. В системах, где это имеет значение, также ставьте stdin, stdout и stderr в двоичном режиме.

[...]

PYTHONUNBUFFERED
Если для него задана непустая строка, это эквивалентно заданию параметра -u.

Если вы имеете дело с дочерними процессами, которые не являются процессами на Python, и вы испытываете проблемы с буферизацией, вам нужно посмотреть документацию этих процессов, чтобы узнать, можно ли их переключить на использование небуферизованного ввода-вывода или переключиться на более желательные стратегии буферизации.

Одна вещь, которую вы могли бы попробовать, - использовать команду script -c для предоставления псевдотерминала дочернему процессу. Однако это инструмент POSIX и, вероятно, недоступен в Windows.


1. Следует отметить, что при промывке трубы данные не записываются на диск; все данные остаются полностью в памяти здесь. Буферы ввода-вывода - это только кэши памяти, чтобы получить максимальную производительность от ввода-вывода, обрабатывая данные в больших кусках. Только если у вас есть файловый объект на диске, fileobj.flush() заставит его нажимать любые буферы в ОС, что обычно означает, что данные действительно записаны на диск.