Подтвердить что ты не робот

Неблокирование чтения на подпроцессе .PIPE в python

Я использую модуль подпроцесса для запуска подпроцесса и подключения к нему выходного потока (stdout). Я хочу, чтобы иметь возможность выполнять неблокирующие чтения на его stdout. Есть ли способ сделать .readline non-blocking или проверить, есть ли данные в потоке, прежде чем я вызову .readline? Я бы хотел, чтобы это было портативным или, по крайней мере, работало под Windows и Linux.

вот как я это делаю сейчас (он блокирует .readline, если нет данных):

p = subprocess.Popen('myprogram.exe', stdout = subprocess.PIPE)
output_str = p.stdout.readline()
4b9b3361

Ответ 1

fcntl, select, asyncproc в этом случае не поможет.

Надежным способом чтения потока без блокировки вне зависимости от операционной системы является использование Queue.get_nowait():

import sys
from subprocess import PIPE, Popen
from threading  import Thread

try:
    from queue import Queue, Empty
except ImportError:
    from Queue import Queue, Empty  # python 2.x

ON_POSIX = 'posix' in sys.builtin_module_names

def enqueue_output(out, queue):
    for line in iter(out.readline, b''):
        queue.put(line)
    out.close()

p = Popen(['myprogram.exe'], stdout=PIPE, bufsize=1, close_fds=ON_POSIX)
q = Queue()
t = Thread(target=enqueue_output, args=(p.stdout, q))
t.daemon = True # thread dies with the program
t.start()

# ... do other things here

# read line without blocking
try:  line = q.get_nowait() # or q.get(timeout=.1)
except Empty:
    print('no output yet')
else: # got line
    # ... do something with line

Ответ 2

У меня часто возникала аналогичная проблема; Программы Python, которые я пишу, часто должны иметь возможность выполнять некоторые основные функции, одновременно принимая пользовательский ввод из командной строки (stdin). Простое использование функций обработки ввода пользователем в другом потоке не решает проблему, потому что readline() блокирует и не имеет таймаута. Если основная функциональность завершена, и больше нет необходимости ждать ввода нового пользователя, я обычно хочу, чтобы моя программа выходила, но она не может, потому что readline() по-прежнему блокирует другой поток, ожидающий строки. Решение, которое я нашел для этой проблемы, заключается в том, чтобы сделать stdin неблокирующим файлом с помощью модуля fcntl:

import fcntl
import os
import sys

# make stdin a non-blocking file
fd = sys.stdin.fileno()
fl = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)

# user input handling thread
while mainThreadIsRunning:
      try: input = sys.stdin.readline()
      except: continue
      handleInput(input)

По-моему, это немного чище, чем использование модулей выбора или сигнала для решения этой проблемы, но затем он работает только в UNIX...

Ответ 3

Python 3.4 представляет новый предварительный API для асинхронного ввода-вывода - asyncio модуль.

Подход похож на twisted -базовый ответ от @Bryan Ward - определяет протокол, и его методы вызывается, как только данные готовы:

#!/usr/bin/env python3
import asyncio
import os

class SubprocessProtocol(asyncio.SubprocessProtocol):
    def pipe_data_received(self, fd, data):
        if fd == 1: # got stdout data (bytes)
            print(data)

    def connection_lost(self, exc):
        loop.stop() # end loop.run_forever()

if os.name == 'nt':
    loop = asyncio.ProactorEventLoop() # for subprocess' pipes on Windows
    asyncio.set_event_loop(loop)
else:
    loop = asyncio.get_event_loop()
try:
    loop.run_until_complete(loop.subprocess_exec(SubprocessProtocol, 
        "myprogram.exe", "arg1", "arg2"))
    loop.run_forever()
finally:
    loop.close()

Смотрите "Подпроцесс" в документах.

Существует высокоуровневый интерфейс asyncio.create_subprocess_exec(), который возвращает Process объекты, который позволяет читать строку асинхронно, используя StreamReader.readline() coroutine async/await Синтаксис Python 3.5+):

#!/usr/bin/env python3.5
import asyncio
import locale
import sys
from asyncio.subprocess import PIPE
from contextlib import closing

async def readline_and_kill(*args):
    # start child process
    process = await asyncio.create_subprocess_exec(*args, stdout=PIPE)

    # read line (sequence of bytes ending with b'\n') asynchronously
    async for line in process.stdout:
        print("got line:", line.decode(locale.getpreferredencoding(False)))
        break
    process.kill()
    return await process.wait() # wait for the child process to exit


if sys.platform == "win32":
    loop = asyncio.ProactorEventLoop()
    asyncio.set_event_loop(loop)
else:
    loop = asyncio.get_event_loop()

with closing(loop):
    sys.exit(loop.run_until_complete(readline_and_kill(
        "myprogram.exe", "arg1", "arg2")))

readline_and_kill() выполняет следующие задачи:

  • начать подпроцесс, перенаправить его stdout в канал
  • читать строку из подпроцесса 'stdout асинхронно
  • убить подпроцесс
  • дождитесь его выхода

Каждый шаг может быть ограничен таймаутом секунд, если необходимо.

Ответ 4

Попробуйте модуль asyncproc. Например:

import os
from asyncproc import Process
myProc = Process("myprogram.app")

while True:
    # check to see if process has ended
    poll = myProc.wait(os.WNOHANG)
    if poll != None:
        break
    # print any new output
    out = myProc.read()
    if out != "":
        print out

Модуль заботится обо всех потоках, как это предлагает S.Lott.

Ответ 5

Используйте select и read (1).

import subprocess     #no new requirements
def readAllSoFar(proc, retVal=''): 
  while (select.select([proc.stdout],[],[],0)[0]!=[]):   
    retVal+=proc.stdout.read(1)
  return retVal
p = subprocess.Popen(['/bin/ls'], stdout=subprocess.PIPE)
while not p.poll():
  print (readAllSoFar(p))

Для readline() - например:

lines = ['']
while not p.poll():
  lines = readAllSoFar(p, lines[-1]).split('\n')
  for a in range(len(lines)-1):
    print a
lines = readAllSoFar(p, lines[-1]).split('\n')
for a in range(len(lines)-1):
  print a

Ответ 6

Вы можете сделать это очень легко в Twisted. В зависимости от существующей базы кода это может быть не так просто, но если вы создаете скрученное приложение, то подобные вещи становятся почти тривиальными. Вы создаете класс ProcessProtocol и переопределяете метод outReceived(). Twisted (в зависимости от используемого реактора) обычно представляет собой просто большой цикл select() с обратными вызовами, установленными для обработки данных из разных файловых дескрипторов (часто сетевых сокетов). Таким образом, метод outReceived() просто устанавливает обратный вызов для обработки данных, поступающих из STDOUT. Простой пример, демонстрирующий это поведение, выглядит следующим образом:

from twisted.internet import protocol, reactor

class MyProcessProtocol(protocol.ProcessProtocol):

    def outReceived(self, data):
        print data

proc = MyProcessProtocol()
reactor.spawnProcess(proc, './myprogram', ['./myprogram', 'arg1', 'arg2', 'arg3'])
reactor.run()

Twisted documentation содержит некоторую полезную информацию об этом.

Если вы создаете все приложение вокруг Twisted, он делает асинхронную связь с другими процессами, локальными или удаленными, действительно элегантными. С другой стороны, если ваша программа не построена поверх Twisted, на самом деле это не будет полезно. Надеюсь, это может быть полезно для других читателей, даже если оно не применимо для вашего конкретного приложения.

Ответ 7

Одно из решений - сделать другой процесс для выполнения вашего чтения процесса или сделать поток процесса с тайм-аутом.

Здесь представлена ​​потоковая версия функции тайм-аута:

http://code.activestate.com/recipes/473878/

Однако, вам нужно прочитать stdout по мере поступления? Другим решением может быть сброс вывода в файл и ожидание завершения процесса с помощью p.wait().

f = open('myprogram_output.txt','w')
p = subprocess.Popen('myprogram.exe', stdout=f)
p.wait()
f.close()


str = open('myprogram_output.txt','r').read()

Ответ 8

Отказ от ответственности: это работает только для торнадо

Вы можете сделать это, установив fd для неблокирования, а затем используйте ioloop для регистрации обратных вызовов. Я упаковал это в яйце под названием tornado_subprocess, и вы можете установить его через PyPI:

easy_install tornado_subprocess

теперь вы можете сделать что-то вроде этого:

import tornado_subprocess
import tornado.ioloop

    def print_res( status, stdout, stderr ) :
    print status, stdout, stderr
    if status == 0:
        print "OK:"
        print stdout
    else:
        print "ERROR:"
        print stderr

t = tornado_subprocess.Subprocess( print_res, timeout=30, args=[ "cat", "/etc/passwd" ] )
t.start()
tornado.ioloop.IOLoop.instance().start()

вы также можете использовать его с RequestHandler

class MyHandler(tornado.web.RequestHandler):
    def on_done(self, status, stdout, stderr):
        self.write( stdout )
        self.finish()

    @tornado.web.asynchronous
    def get(self):
        t = tornado_subprocess.Subprocess( self.on_done, timeout=30, args=[ "cat", "/etc/passwd" ] )
        t.start()

Ответ 9

Существующие решения для меня не работали (подробнее см. ниже). В конечном итоге работала над реализацией readline, используя read (1) (на основе этого ответа). Последнее не блокирует:

from subprocess import Popen, PIPE
from threading import Thread
def process_output(myprocess): #output-consuming thread
    nextline = None
    buf = ''
    while True:
        #--- extract line using read(1)
        out = myprocess.stdout.read(1)
        if out == '' and myprocess.poll() != None: break
        if out != '':
            buf += out
            if out == '\n':
                nextline = buf
                buf = ''
        if not nextline: continue
        line = nextline
        nextline = None

        #--- do whatever you want with line here
        print 'Line is:', line
    myprocess.stdout.close()

myprocess = Popen('myprogram.exe', stdout=PIPE) #output-producing process
p1 = Thread(target=process_output, args=(dcmpid,)) #output-consuming thread
p1.daemon = True
p1.start()

#--- do whatever here and then kill process and thread if needed
if myprocess.poll() == None: #kill process; will automatically stop thread
    myprocess.kill()
    myprocess.wait()
if p1 and p1.is_alive(): #wait for thread to finish
    p1.join()

Почему существующие решения не работают:

  • Решения, требующие чтения (включая Очередь), всегда блокируются. Трудно (невозможно?) Убить поток, который выполняет readline. Он только убивается, когда процесс, который его создал, заканчивается, но не тогда, когда процесс, производящий результат, уничтожается.
  • Смешение низкоуровневого fcntl с высокоуровневыми вызовами readline может не работать должным образом, как указывал anonnn.
  • Использование select.poll() является аккуратным, но не работает в Windows в соответствии с документами python.
  • Использование сторонних библиотек кажется излишним для этой задачи и добавляет дополнительные зависимости.

Ответ 10

Эта версия неблокирующего чтения не требует специальных модулей и будет работать из коробки на большинстве дистрибутивов Linux.

import os
import sys
import time
import fcntl
import subprocess

def async_read(fd):
    # set non-blocking flag while preserving old flags
    fl = fcntl.fcntl(fd, fcntl.F_GETFL)
    fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
    # read char until EOF hit
    while True:
        try:
            ch = os.read(fd.fileno(), 1)
            # EOF
            if not ch: break                                                                                                                                                              
            sys.stdout.write(ch)
        except OSError:
            # waiting for data be available on fd
            pass

def shell(args, async=True):
    # merge stderr and stdout
    proc = subprocess.Popen(args, shell=False, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
    if async: async_read(proc.stdout)
    sout, serr = proc.communicate()
    return (sout, serr)

if __name__ == '__main__':
    cmd = 'ping 8.8.8.8'
    sout, serr = shell(cmd.split())

Ответ 11

Я добавляю эту проблему, чтобы прочитать какой-то подпроцесс .Popen stdout. Вот мое неблокирующее решение для чтения:

import fcntl

def non_block_read(output):
    fd = output.fileno()
    fl = fcntl.fcntl(fd, fcntl.F_GETFL)
    fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
    try:
        return output.read()
    except:
        return ""

# Use example
from subprocess import *
sb = Popen("echo test && sleep 1000", shell=True, stdout=PIPE)
sb.kill()

# sb.stdout.read() # <-- This will block
non_block_read(sb.stdout)
'test\n'

Ответ 12

Вот мой код, используемый для вывода каждого выхода из подпроцесса ASAP, включая частичные линии. Он нагнетает в то же время и stdout и stderr в почти правильном порядке.

Протестировано и правильно работает на Linux и Windowsх Python 2.7.

#!/usr/bin/python
#
# Runner with stdout/stderr catcher
#
from sys import argv
from subprocess import Popen, PIPE
import os, io
from threading import Thread
import Queue
def __main__():
    if (len(argv) > 1) and (argv[-1] == "-sub-"):
        import time, sys
        print "Application runned!"
        time.sleep(2)
        print "Slept 2 second"
        time.sleep(1)
        print "Slept 1 additional second",
        time.sleep(2)
        sys.stderr.write("Stderr output after 5 seconds")
        print "Eol on stdin"
        sys.stderr.write("Eol on stderr\n")
        time.sleep(1)
        print "Wow, we have end of work!",
    else:
        os.environ["PYTHONUNBUFFERED"]="1"
        try:
            p = Popen( argv + ["-sub-"],
                       bufsize=0, # line-buffered
                       stdin=PIPE, stdout=PIPE, stderr=PIPE )
        except WindowsError, W:
            if W.winerror==193:
                p = Popen( argv + ["-sub-"],
                           shell=True, # Try to run via shell
                           bufsize=0, # line-buffered
                           stdin=PIPE, stdout=PIPE, stderr=PIPE )
            else:
                raise
        inp = Queue.Queue()
        sout = io.open(p.stdout.fileno(), 'rb', closefd=False)
        serr = io.open(p.stderr.fileno(), 'rb', closefd=False)
        def Pump(stream, category):
            queue = Queue.Queue()
            def rdr():
                while True:
                    buf = stream.read1(8192)
                    if len(buf)>0:
                        queue.put( buf )
                    else:
                        queue.put( None )
                        return
            def clct():
                active = True
                while active:
                    r = queue.get()
                    try:
                        while True:
                            r1 = queue.get(timeout=0.005)
                            if r1 is None:
                                active = False
                                break
                            else:
                                r += r1
                    except Queue.Empty:
                        pass
                    inp.put( (category, r) )
            for tgt in [rdr, clct]:
                th = Thread(target=tgt)
                th.setDaemon(True)
                th.start()
        Pump(sout, 'stdout')
        Pump(serr, 'stderr')

        while p.poll() is None:
            # App still working
            try:
                chan,line = inp.get(timeout = 1.0)
                if chan=='stdout':
                    print "STDOUT>>", line, "<?<"
                elif chan=='stderr':
                    print " ERROR==", line, "=?="
            except Queue.Empty:
                pass
        print "Finish"

if __name__ == '__main__':
    __main__()

Ответ 13

Добавьте сюда этот ответ, поскольку он предоставляет возможность устанавливать неблокирующие каналы в Windows и Unix.

Все детали ctypes благодаря ответу @techtonik.

Существует небольшая версия, которая будет использоваться как в системах Unix, так и в Windows.

  • Совместимость с Python3 (требуется только небольшое изменение).
  • Включает версию posix и определяет исключение для использования для.

Таким образом, вы можете использовать ту же функцию и исключение для кода Unix и Windows.

# pipe_non_blocking.py (module)
"""
Example use:

    p = subprocess.Popen(
            command,
            stdout=subprocess.PIPE,
            )

    pipe_non_blocking_set(p.stdout.fileno())

    try:
        data = os.read(p.stdout.fileno(), 1)
    except PortableBlockingIOError as ex:
        if not pipe_non_blocking_is_error_blocking(ex):
            raise ex
"""


__all__ = (
    "pipe_non_blocking_set",
    "pipe_non_blocking_is_error_blocking",
    "PortableBlockingIOError",
    )

import os


if os.name == "nt":
    def pipe_non_blocking_set(fd):
        # Constant could define globally but avoid polluting the name-space
        # thanks to: /questions/10307/non-blocking-read-on-ospipe-on-windows
        import msvcrt

        from ctypes import windll, byref, wintypes, WinError, POINTER
        from ctypes.wintypes import HANDLE, DWORD, BOOL

        LPDWORD = POINTER(DWORD)

        PIPE_NOWAIT = wintypes.DWORD(0x00000001)

        def pipe_no_wait(pipefd):
            SetNamedPipeHandleState = windll.kernel32.SetNamedPipeHandleState
            SetNamedPipeHandleState.argtypes = [HANDLE, LPDWORD, LPDWORD, LPDWORD]
            SetNamedPipeHandleState.restype = BOOL

            h = msvcrt.get_osfhandle(pipefd)

            res = windll.kernel32.SetNamedPipeHandleState(h, byref(PIPE_NOWAIT), None, None)
            if res == 0:
                print(WinError())
                return False
            return True

        return pipe_no_wait(fd)

    def pipe_non_blocking_is_error_blocking(ex):
        if not isinstance(ex, PortableBlockingIOError):
            return False
        from ctypes import GetLastError
        ERROR_NO_DATA = 232

        return (GetLastError() == ERROR_NO_DATA)

    PortableBlockingIOError = OSError
else:
    def pipe_non_blocking_set(fd):
        import fcntl
        fl = fcntl.fcntl(fd, fcntl.F_GETFL)
        fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
        return True

    def pipe_non_blocking_is_error_blocking(ex):
        if not isinstance(ex, PortableBlockingIOError):
            return False
        return True

    PortableBlockingIOError = BlockingIOError

Чтобы избежать чтения неполных данных, я закончил писать собственный генератор readline (который возвращает строку байтов для каждой строки).

Его генератор, чтобы вы могли, например...

def non_blocking_readlines(f, chunk=1024):
    """
    Iterate over lines, yielding b'' when nothings left
    or when new data is not yet available.

    stdout_iter = iter(non_blocking_readlines(process.stdout))

    line = next(stdout_iter)  # will be a line or b''.
    """
    import os

    from .pipe_non_blocking import (
            pipe_non_blocking_set,
            pipe_non_blocking_is_error_blocking,
            PortableBlockingIOError,
            )

    fd = f.fileno()
    pipe_non_blocking_set(fd)

    blocks = []

    while True:
        try:
            data = os.read(fd, chunk)
            if not data:
                # case were reading finishes with no trailing newline
                yield b''.join(blocks)
                blocks.clear()
        except PortableBlockingIOError as ex:
            if not pipe_non_blocking_is_error_blocking(ex):
                raise ex

            yield b''
            continue

        while True:
            n = data.find(b'\n')
            if n == -1:
                break

            yield b''.join(blocks) + data[:n + 1]
            data = data[n + 1:]
            blocks.clear()
        blocks.append(data)

Ответ 14

Модуль select помогает определить, где находится следующий полезный ввод.

Однако вы почти всегда счастливее с отдельными потоками. Один блокирует чтение stdin, другой делает, где бы вы не хотели блокировать.

Ответ 15

зачем беспокоить очередь потоков и? в отличие от readline(), BufferedReader.read1() не блокирует ожидание \r\n, он возвращает ASAP, если есть выход.

#!/usr/bin/python
from subprocess import Popen, PIPE, STDOUT
import io

def __main__():
    try:
        p = Popen( ["ping", "-n", "3", "127.0.0.1"], stdin=PIPE, stdout=PIPE, stderr=STDOUT )
    except: print("Popen failed"); quit()
    sout = io.open(p.stdout.fileno(), 'rb', closefd=False)
    while True:
        buf = sout.read1(1024)
        if len(buf) == 0: break
        print buf,

if __name__ == '__main__':
    __main__()

Ответ 16

В моем случае мне нужен модуль протоколирования, который улавливает вывод из фоновых приложений и увеличивает его (добавляет отметки времени, цвета и т.д.).

В итоге я получил фоновый поток, который делает фактический ввод-вывод. Следующий код предназначен только для платформ POSIX. Я раздевал несущественные части.

Если кто-то собирается использовать этот зверь для длительных прогонов, рассмотрим возможность управления открытыми дескрипторами. В моем случае это была не большая проблема.

# -*- python -*-
import fcntl
import threading
import sys, os, errno
import subprocess

class Logger(threading.Thread):
    def __init__(self, *modules):
        threading.Thread.__init__(self)
        try:
            from select import epoll, EPOLLIN
            self.__poll = epoll()
            self.__evt = EPOLLIN
            self.__to = -1
        except:
            from select import poll, POLLIN
            print 'epoll is not available'
            self.__poll = poll()
            self.__evt = POLLIN
            self.__to = 100
        self.__fds = {}
        self.daemon = True
        self.start()

    def run(self):
        while True:
            events = self.__poll.poll(self.__to)
            for fd, ev in events:
                if (ev&self.__evt) != self.__evt:
                    continue
                try:
                    self.__fds[fd].run()
                except Exception, e:
                    print e

    def add(self, fd, log):
        assert not self.__fds.has_key(fd)
        self.__fds[fd] = log
        self.__poll.register(fd, self.__evt)

class log:
    logger = Logger()

    def __init__(self, name):
        self.__name = name
        self.__piped = False

    def fileno(self):
        if self.__piped:
            return self.write
        self.read, self.write = os.pipe()
        fl = fcntl.fcntl(self.read, fcntl.F_GETFL)
        fcntl.fcntl(self.read, fcntl.F_SETFL, fl | os.O_NONBLOCK)
        self.fdRead = os.fdopen(self.read)
        self.logger.add(self.read, self)
        self.__piped = True
        return self.write

    def __run(self, line):
        self.chat(line, nl=False)

    def run(self):
        while True:
            try: line = self.fdRead.readline()
            except IOError, exc:
                if exc.errno == errno.EAGAIN:
                    return
                raise
            self.__run(line)

    def chat(self, line, nl=True):
        if nl: nl = '\n'
        else: nl = ''
        sys.stdout.write('[%s] %s%s' % (self.__name, line, nl))

def system(command, param=[], cwd=None, env=None, input=None, output=None):
    args = [command] + param
    p = subprocess.Popen(args, cwd=cwd, stdout=output, stderr=output, stdin=input, env=env, bufsize=0)
    p.wait()

ls = log('ls')
ls.chat('go')
system("ls", ['-l', '/'], output=ls)

date = log('date')
date.chat('go')
system("date", output=date)

Ответ 18

EDIT: эта реализация по-прежнему блокируется. Вместо этого используйте J.F.Sebastian .

Я попробовал лучший ответ, но дополнительный риск и обслуживание кода потока были тревожными.

Просматривая io module (и ограничиваясь 2.6), я нашел BufferedReader. Это мое безболезненное, неблокирующее решение.

import io
from subprocess import PIPE, Popen

p = Popen(['myprogram.exe'], stdout=PIPE)

SLEEP_DELAY = 0.001

# Create an io.BufferedReader on the file descriptor for stdout
with io.open(p.stdout.fileno(), 'rb', closefd=False) as buffer:
  while p.poll() == None:
      time.sleep(SLEEP_DELAY)
      while '\n' in bufferedStdout.peek(bufferedStdout.buffer_size):
          line = buffer.readline()
          # do stuff with the line

  # Handle any remaining output after the process has ended
  while buffer.peek():
    line = buffer.readline()
    # do stuff with the line

Ответ 19

Работая с ответом Дж. Ф. Себастьяна и несколькими другими источниками, я собрал простой менеджер подпроцесса. Он обеспечивает неблокирующее чтение запроса, а также выполняет несколько процессов параллельно. Он не использует какой-либо OS-специфический вызов (который я знаю) и, следовательно, должен работать где угодно.

Он доступен из pypi, поэтому просто pip install shelljob. См. Страницу для примеров и полных документов.

Ответ 20

Недавно я наткнулся на ту же проблему Мне нужно прочитать одну строку во времени из потока (хвост в подпроцессе) в неблокирующем режиме Я хотел избежать следующих проблем: не записывать процессор, не читать поток за один байт (например, readline) и т.д.

Вот моя реализация https://gist.github.com/grubberr/5501e1a9760c3eab5e0a он не поддерживает окна (опрос), не обрабатывает EOF, но он работает для меня хорошо

Ответ 21

Это пример запуска интерактивной команды в подпроцессе, а stdout является интерактивным с использованием псевдотерминала. Вы можете обратиться к: fooobar.com/questions/3871/...

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import os
import sys
import select
import termios
import tty
import pty
from subprocess import Popen

command = 'bash'
# command = 'docker run -it --rm centos /bin/bash'.split()

# save original tty setting then set it to raw mode
old_tty = termios.tcgetattr(sys.stdin)
tty.setraw(sys.stdin.fileno())

# open pseudo-terminal to interact with subprocess
master_fd, slave_fd = pty.openpty()

# use os.setsid() make it run in a new process group, or bash job control will not be enabled
p = Popen(command,
          preexec_fn=os.setsid,
          stdin=slave_fd,
          stdout=slave_fd,
          stderr=slave_fd,
          universal_newlines=True)

while p.poll() is None:
    r, w, e = select.select([sys.stdin, master_fd], [], [])
    if sys.stdin in r:
        d = os.read(sys.stdin.fileno(), 10240)
        os.write(master_fd, d)
    elif master_fd in r:
        o = os.read(master_fd, 10240)
        if o:
            os.write(sys.stdout.fileno(), o)

# restore tty settings back
termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_tty)

Ответ 22

Моя проблема немного другая, поскольку я хотел собрать как stdout, так и stderr из работающего процесса, но в конечном итоге это же, так как я хотел бы сделать вывод в виджет как сгенерированный.

Я не хотел прибегать ко многим предложенным обходным решениям с использованием очередей или дополнительных потоков, поскольку они не должны быть необходимы для выполнения такой общей задачи, как запуск другого script и сбор его результатов.

После прочтения предлагаемых решений и документов python я решил проблему с приведенной ниже реализацией. Да, он работает только для POSIX, поскольку я использую вызов функции select.

Я согласен с тем, что документы сбивают с толку, и реализация неловкая для такой общей задачи сценариев. Я считаю, что более старые версии python имеют разные значения по умолчанию для Popen и разные объяснения, что создает много путаницы. Это, похоже, хорошо работает как для Python 2.7.12, так и для 3.5.2.

Ключ должен был установить bufsize=1 для буферизации строк, а затем universal_newlines=True обрабатывать как текстовый файл вместо двоичного файла, который, по-видимому, становится значением по умолчанию при настройке bufsize=1.

class workerThread(QThread):
   def __init__(self, cmd):
      QThread.__init__(self)
      self.cmd = cmd
      self.result = None           ## return code
      self.error = None            ## flag indicates an error
      self.errorstr = ""           ## info message about the error

   def __del__(self):
      self.wait()
      DEBUG("Thread removed")

   def run(self):
      cmd_list = self.cmd.split(" ")   
      try:
         cmd = subprocess.Popen(cmd_list, bufsize=1, stdin=None
                                        , universal_newlines=True
                                        , stderr=subprocess.PIPE
                                        , stdout=subprocess.PIPE)
      except OSError:
         self.error = 1
         self.errorstr = "Failed to execute " + self.cmd
         ERROR(self.errorstr)
      finally:
         VERBOSE("task started...")
      import select
      while True:
         try:
            r,w,x = select.select([cmd.stdout, cmd.stderr],[],[])
            if cmd.stderr in r:
               line = cmd.stderr.readline()
               if line != "":
                  line = line.strip()
                  self.emit(SIGNAL("update_error(QString)"), line)
            if cmd.stdout in r:
               line = cmd.stdout.readline()
               if line == "":
                  break
               line = line.strip()
               self.emit(SIGNAL("update_output(QString)"), line)
         except IOError:
            pass
      cmd.wait()
      self.result = cmd.returncode
      if self.result < 0:
         self.error = 1
         self.errorstr = "Task terminated by signal " + str(self.result)
         ERROR(self.errorstr)
         return
      if self.result:
         self.error = 1
         self.errorstr = "exit code " + str(self.result)
         ERROR(self.errorstr)
         return
      return

ERROR, DEBUG и VERBOSE - это просто макросы, которые выводят вывод на терминал.

Это решение IMHO 99.99% эффективно, поскольку оно все еще использует функцию блокировки readline, поэтому мы предполагаем, что подпроцесс хорош и выводит полные строки.

Я приветствую обратную связь, чтобы улучшить решение, поскольку я все еще новичок в Python.

Ответ 23

Это решение использует модуль select для "чтения любых доступных данных" из потока ввода-вывода. Эта функция блокирует сначала до тех пор, пока данные не будут доступны, но затем прочитает только доступные данные и не будет блокироваться дальше.

Учитывая тот факт, что он использует модуль select, это работает только в Unix.

Код полностью совместим с PEP8.

import select


def read_available(input_stream, max_bytes=None):
    """
    Blocks until any data is available, then all available data is then read and returned.
    This function returns an empty string when end of stream is reached.

    Args:
        input_stream: The stream to read from.
        max_bytes (int|None): The maximum number of bytes to read. This function may return fewer bytes than this.

    Returns:
        str
    """
    # Prepare local variables
    input_streams = [input_stream]
    empty_list = []
    read_buffer = ""

    # Initially block for input using 'select'
    if len(select.select(input_streams, empty_list, empty_list)[0]) > 0:

        # Poll read-readiness using 'select'
        def select_func():
            return len(select.select(input_streams, empty_list, empty_list, 0)[0]) > 0

        # Create while function based on parameters
        if max_bytes is not None:
            def while_func():
                return (len(read_buffer) < max_bytes) and select_func()
        else:
            while_func = select_func

        while True:
            # Read single byte at a time
            read_data = input_stream.read(1)
            if len(read_data) == 0:
                # End of stream
                break
            # Append byte to string buffer
            read_buffer += read_data
            # Check if more data is available
            if not while_func():
                break

    # Return read buffer
    return read_buffer

Ответ 24

Я также столкнулся с проблемой, описанной Джесси, и решил ее, используя "выбрать", как Брэдли, Энди и другие, но в режиме блокировки, чтобы избежать цикла занятости. Он использует фиктивную трубу в качестве поддельного stdin. Выбирайте блоки и ждите, пока STDIN или трубу не будут готовы. При нажатии клавиши stdin разблокирует выбор, и значение ключа можно получить с помощью read (1). Когда другой поток записывается в трубку, тогда канал разблокирует выбор, и его можно принять за указание, что необходимость в stdin завершена. Вот несколько этапов:

import sys
import os
from select import select

# -------------------------------------------------------------------------    
# Set the pipe (fake stdin) to simulate a final key stroke
# which will unblock the select statement
readEnd, writeEnd = os.pipe()
readFile = os.fdopen(readEnd)
writeFile = os.fdopen(writeEnd, "w")

# -------------------------------------------------------------------------
def getKey():

    # Wait for stdin or pipe (fake stdin) to be ready
    dr,dw,de = select([sys.__stdin__, readFile], [], [])

    # If stdin is the one ready then read it and return value
    if sys.__stdin__ in dr:
        return sys.__stdin__.read(1)   # For Windows use ----> getch() from module msvcrt

    # Must finish
    else:
        return None

# -------------------------------------------------------------------------
def breakStdinRead():
    writeFile.write(' ')
    writeFile.flush()

# -------------------------------------------------------------------------
# MAIN CODE

# Get key stroke
key = getKey()

# Keyboard input
if key:
    # ... do your stuff with the key value

# Faked keystroke
else:
    # ... use of stdin finished

# -------------------------------------------------------------------------
# OTHER THREAD CODE

breakStdinRead()

Ответ 25

У меня проблема с исходным вопросом, но я не хочу ссылаться на темы. Я смешал решение Джесси с прямым чтением() из канала, а мой собственный обработчик буфера для чтения строк (однако мой подпроцесс - пинг - всегда писал полные строки <размер системной страницы). Я избегаю оживленного ожидания, только просматривая в часах, зарегистрированных gobject. В эти дни я обычно запускаю код внутри gobject MainLoop, чтобы избежать потоков.

def set_up_ping(ip, w):
# run the sub-process
# watch the resultant pipe
p = subprocess.Popen(['/bin/ping', ip], stdout=subprocess.PIPE)
# make stdout a non-blocking file
fl = fcntl.fcntl(p.stdout, fcntl.F_GETFL)
fcntl.fcntl(p.stdout, fcntl.F_SETFL, fl | os.O_NONBLOCK)
stdout_gid = gobject.io_add_watch(p.stdout, gobject.IO_IN, w)
return stdout_gid # for shutting down

Наблюдатель

def watch(f, *other):
print 'reading',f.read()
return True

И основная программа устанавливает пинг, а затем вызывает цикл gobject mail.

def main():
set_up_ping('192.168.1.8', watch)
# discard gid as unused here
gobject.MainLoop().run()

Любая другая работа связана с обратными вызовами в gobject.

Ответ 26

В современном Python дела обстоят намного лучше.

Вот простая дочерняя программа "hello.py":

#!/usr/bin/env python3

while True:
    i = input()
    if i == "quit":
        break
    print(f"hello {i}")

И программа для взаимодействия с ним:

import asyncio


async def main():
    proc = await asyncio.subprocess.create_subprocess_exec(
        "./hello.py", stdin=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE
    )
    proc.stdin.write(b"bob\n")
    print(await proc.stdout.read(1024))
    proc.stdin.write(b"alice\n")
    print(await proc.stdout.read(1024))
    proc.stdin.write(b"quit\n")
    await proc.wait()


asyncio.run(main())

Это распечатывает:

b'hello bob\n'
b'hello alice\n'

Обратите внимание, что фактический шаблон, который также содержится почти во всех предыдущих ответах, как здесь, так и в связанных вопросах, состоит в том, чтобы установить дескриптор дочернего файла стандартного вывода на неблокирующее, а затем опрашивать его в каком-то цикле выбора. В эти дни, конечно, этот цикл предоставляется Asyncio.

Ответ 27

Вот модуль, который поддерживает неблокирующие чтения и записи фона в python:

https://pypi.python.org/pypi/python-nonblock

Предоставляет функцию,

nonblock_read, который будет считывать данные из потока, если он доступен, в противном случае возвращает пустую строку (или None, если поток закрыт с другой стороны, и все возможные данные были прочитаны)

Вы также можете рассмотреть модуль python-subprocess2,

https://pypi.python.org/pypi/python-subprocess2

который добавляется к модулю подпроцесса. Таким образом, на объект, возвращенный из "subprocess.Popen" добавлен дополнительный метод runInBackground. Это запускает поток и возвращает объект, который будет автоматически заполнен, поскольку материал записывается в stdout/stderr, не блокируя основной поток.

Наслаждайтесь!