Подтвердить что ты не робот

Обрабатывать большое количество данных для stdin при использовании подпроцесса. Popen

Я как бы пытаюсь понять, что такое python для решения этой простой проблемы.

Моя проблема довольно проста. Если вы используете следующий код, он будет висеть. Это хорошо документировано в документе subprocess doc.

import subprocess

proc = subprocess.Popen(['cat','-'],
                        stdin=subprocess.PIPE,
                        stdout=subprocess.PIPE,
                        )
for i in range(100000):
    proc.stdin.write('%d\n' % i)
output = proc.communicate()[0]
print output

Поиск решения (есть очень проницательный поток, но я потерял его сейчас). Я нашел это решение (среди прочих), которое использует явную fork:

import os
import sys
from subprocess import Popen, PIPE

def produce(to_sed):
    for i in range(100000):
        to_sed.write("%d\n" % i)
        to_sed.flush()
    #this would happen implicitly, anyway, but is here for the example
    to_sed.close()

def consume(from_sed):
    while 1:
        res = from_sed.readline()
        if not res:
            sys.exit(0)
            #sys.exit(proc.poll())
        print 'received: ', [res]

def main():
    proc = Popen(['cat','-'],stdin=PIPE,stdout=PIPE)
    to_sed = proc.stdin
    from_sed = proc.stdout

    pid = os.fork()
    if pid == 0 :
        from_sed.close()
        produce(to_sed)
        return
    else :
        to_sed.close()
        consume(from_sed)

if __name__ == '__main__':
    main()

В то время как это решение концептуально очень легко понять, оно использует еще один процесс и задерживается как слишком низкий уровень по сравнению с модулем подпроцесса (это просто для того, чтобы скрыть такие вещи...).

Мне интересно: существует ли простое и чистое решение с использованием модуля подпроцесса, который не будет висели или для реализации этого теста. Мне нужно сделать шаг назад и реализовать цикл выбора в старом стиле или явный fork?

Спасибо

4b9b3361

Ответ 1

Если вы хотите чистое решение Python, вам нужно поместить либо читателя, либо автора в отдельный поток. Пакет threading - это легкий способ сделать это, с удобным доступом к обычным объектам и бесполезным форкированием.

import subprocess
import threading
import sys

proc = subprocess.Popen(['cat','-'],
                        stdin=subprocess.PIPE,
                        stdout=subprocess.PIPE,
                        )
def writer():
    for i in range(100000):
        proc.stdin.write('%d\n' % i)
    proc.stdin.close()
thread = threading.Thread(target=writer)
thread.start()
for line in proc.stdout:
    sys.stdout.write(line)
thread.join()
proc.wait()

Может показаться, что модуль subprocess модернизирован для поддержки потоков и сопрограмм, что позволит конструировать трубопроводы, которые смешивают кусочки Python и куски оболочки, более элегантно.

Ответ 2

Если вы не хотите хранить все данные в памяти, вам нужно использовать select. Например. что-то вроде:

import subprocess
from select import select
import os

proc = subprocess.Popen(['cat'], stdin=subprocess.PIPE, stdout=subprocess.PIPE)

i = 0;
while True:
    rlist, wlist, xlist = [proc.stdout], [], []
    if i < 100000:
        wlist.append(proc.stdin)
    rlist, wlist, xlist = select(rlist, wlist, xlist)
    if proc.stdout in rlist:
        out = os.read(proc.stdout.fileno(), 10)
        print out,
        if not out:
            break
    if proc.stdin in wlist:
        proc.stdin.write('%d\n' % i)
        i += 1
        if i >= 100000:
            proc.stdin.close()

Ответ 3

Здесь что-то я использовал для загрузки загружаемых файлов mysql 6G через подпроцесс. Держитесь подальше от оболочки = True. Небезопасно и начать с ресурсов, тратящих ресурсы.

import subprocess

fhandle = None

cmd = [mysql_path,
      "-u", mysql_user, "-p" + mysql_pass],
      "-h", host, database]

fhandle = open(dump_file, 'r')
p = subprocess.Popen(cmd, stdin=fhandle, stdout=subprocess.PIPE, stderr=subprocess.PIPE)

(stdout,stderr) = p.communicate()

fhandle.close()

Ответ 4

Для такого рода вещей оболочка работает намного лучше, чем подпроцесс.

Напишите очень простые приложения Python, которые читаются с sys.stdin и пишут sys.stdout.

Соедините простые приложения вместе с конвейером оболочки.

Если вы хотите, запустите конвейер с помощью subprocess или просто напишите однострочную оболочку script.

python part1.py | python part2.py

Это очень, очень эффективно. Он также переносится на все Linux (и Windows), пока вы держите его очень просто.

Ответ 5

Ваши блокировки кода, как только cat stdout OS buffer buffer заполнен. Если вы используете stdout=PIPE; вы должны использовать его вовремя, иначе может возникнуть тупик, как в вашем случае.

Если вам не нужен вывод во время выполнения процесса; вы можете перенаправить его во временный файл:

#!/usr/bin/env python3
import subprocess
import tempfile

with tempfile.TemporaryFile('r+') as output_file:
    with subprocess.Popen(['cat'],
                          stdin=subprocess.PIPE,
                          stdout=output_file,
                          universal_newlines=True) as process:
        for i in range(100000):
            print(i, file=process.stdin)
    output_file.seek(0)  # rewind (and sync with the disk)
    print(output_file.readline(), end='')  # get  the first line of the output

Если вход/выход мал (поместится в память); вы можете передать вход все сразу и получить вывод сразу, используя .communicate(), который одновременно читает/записывает:

#!/usr/bin/env python3
import subprocess

cp = subprocess.run(['cat'], input='\n'.join(['%d' % i for i in range(100000)]),
                    stdout=subprocess.PIPE, universal_newlines=True)
print(cp.stdout.splitlines()[-1]) # print the last line

Чтобы читать/писать одновременно вручную, вы можете использовать потоки, asyncio, fcntl и т.д. @Jed предоставил простой поток-решение. Здесь решение на основе asyncio:

#!/usr/bin/env python3
import asyncio
import sys
from subprocess import PIPE

async def pump_input(writer):
     try:
         for i in range(100000):
             writer.write(b'%d\n' % i)
             await writer.drain()
     finally:
         writer.close()

async def run():
    # start child process
    # NOTE: universal_newlines parameter is not supported
    process = await asyncio.create_subprocess_exec('cat', stdin=PIPE, stdout=PIPE)
    asyncio.ensure_future(pump_input(process.stdin)) # write input
    async for line in process.stdout: # consume output
        print(int(line)**2) # print squares
    return await process.wait()  # wait for the child process to exit


if sys.platform.startswith('win'):
    loop = asyncio.ProactorEventLoop() # for subprocess' pipes on Windows
    asyncio.set_event_loop(loop)
else:
    loop = asyncio.get_event_loop()
loop.run_until_complete(run())
loop.close()

В Unix вы можете использовать решение на основе fcntl:

#!/usr/bin/env python3
import sys
from fcntl import fcntl, F_GETFL, F_SETFL
from os import O_NONBLOCK
from shutil import copyfileobj
from subprocess import Popen, PIPE, _PIPE_BUF as PIPE_BUF

def make_blocking(pipe, blocking=True):
    fd = pipe.fileno()
    if not blocking:
        fcntl(fd, F_SETFL, fcntl(fd, F_GETFL) | O_NONBLOCK) # set O_NONBLOCK
    else:
        fcntl(fd, F_SETFL, fcntl(fd, F_GETFL) & ~O_NONBLOCK) # clear it


with Popen(['cat'], stdin=PIPE, stdout=PIPE) as process:
    make_blocking(process.stdout, blocking=False)
    with process.stdin:
        for i in range(100000):
            #NOTE: the mode is block-buffered (default) and therefore
            # `cat` won't see it immidiately
            process.stdin.write(b'%d\n' % i)
            # a deadblock may happen here with a *blocking* pipe
            output = process.stdout.read(PIPE_BUF)
            if output is not None:
                sys.stdout.buffer.write(output)
    # read the rest
    make_blocking(process.stdout)
    copyfileobj(process.stdout, sys.stdout.buffer)

Ответ 6

Вот пример (Python 3) чтения одной записи за раз из gzip с использованием протокола:

cmd = 'gzip -dc compressed_file.gz'
pipe = Popen(cmd, stdout=PIPE).stdout

for line in pipe:
    print(":", line.decode(), end="")

Я знаю, что для этого есть стандартный модуль, это просто пример. Вы можете прочитать весь вывод за один раз (например, обратно-тики оболочки) с использованием метода связи, но, очевидно, вы должны быть осторожны с размером памяти.

Вот пример (снова Python 3) записи записей в программу lp (1) в Linux:

cmd = 'lp -'
proc = Popen(cmd, stdin=PIPE)
proc.communicate(some_data.encode())

Ответ 7

Теперь я знаю, что это не будет полностью удовлетворять вас пуристом, так как вход должен будет поместиться в память, и у вас нет возможности работать в интерактивном режиме с вводом-выводом, но по крайней мере это отлично работает на вашем примере, Метод связи необязательно принимает входные данные в качестве аргумента, и если вы подаете свой процесс таким образом, он будет работать.

import subprocess

proc = subprocess.Popen(['cat','-'],
                        stdin=subprocess.PIPE,
                        stdout=subprocess.PIPE,
                        )

input = "".join('{0:d}\n'.format(i) for i in range(100000))
output = proc.communicate(input)[0]
print output

Что касается более крупной проблемы, вы можете подклассифицировать Popen, переписать __init__, чтобы принять потоковые объекты в качестве аргументов в stdin, stdout, stderr и переписать метод _communicate (волосатый для кроссплатформенности, вам нужно это сделать дважды, см. источник subprocess.py), чтобы вызвать read() в потоке stdin и записать() вывод в потоки stdout и stderr. Меня беспокоит такой подход: насколько я знаю, это еще не сделано. Когда очевидные вещи раньше не делались, обычно есть причина (она не работает должным образом), но я не понимаю, почему это не так, кроме того, что потоки должны быть потокобезопасными в Windows.

Ответ 8

Использование aiofiles и asyncio в python 3.5:

Немного сложный, но для записи в stdin требуется только 1024 байта!

import asyncio
import aiofiles
import sys
from os.path import dirname, join, abspath
import subprocess as sb


THIS_DIR = abspath(dirname(__file__))
SAMPLE_FILE = join(THIS_DIR, '../src/hazelnut/tests/stuff/sample.mp4')
DEST_PATH = '/home/vahid/Desktop/sample.mp4'


async def async_file_reader(f, buffer):
    async for l in f:
        if l:
            buffer.append(l)
        else:
            break
    print('reader done')


async def async_file_writer(source_file, target_file):
    length = 0
    while True:
        input_chunk = await source_file.read(1024)
        if input_chunk:
            length += len(input_chunk)
            target_file.write(input_chunk)
            await target_file.drain()
        else:
            target_file.write_eof()
            break

    print('writer done: %s' % length)


async def main():
    dir_name = dirname(DEST_PATH)
    remote_cmd = 'ssh localhost mkdir -p %s && cat - > %s' % (dir_name, DEST_PATH)

    stdout, stderr = [], []
    async with aiofiles.open(SAMPLE_FILE, mode='rb') as f:
        cmd = await asyncio.create_subprocess_shell(
            remote_cmd,
            stdin=sb.PIPE,
            stdout=sb.PIPE,
            stderr=sb.PIPE,
        )

        await asyncio.gather(*(
            async_file_reader(cmd.stdout, stdout),
            async_file_reader(cmd.stderr, stderr),
            async_file_writer(f, cmd.stdin)
        ))

        print('EXIT STATUS: %s' % await cmd.wait())

    stdout, stderr = '\n'.join(stdout), '\n'.join(stderr)

    if stdout:
        print(stdout)

    if stderr:
        print(stderr, file=sys.stderr)


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

Результат:

writer done: 383631
reader done
reader done
EXIT STATUS: 0

Ответ 9

Простейшее решение, о котором я могу думать:

from subprocess import Popen, PIPE
from threading import Thread

s = map(str,xrange(10000)) # a large string
p = Popen(['cat'], stdin=PIPE, stdout=PIPE, bufsize=1)
Thread(target=lambda: any((p.stdin.write(b) for b in s)) or p.stdin.close()).start()
print (p.stdout.read())

Буферизованная версия:

from subprocess import Popen, PIPE
from threading import Thread

s = map(str,xrange(10000)) # a large string
n = 1024 # buffer size
p = Popen(['cat'], stdin=PIPE, stdout=PIPE, bufsize=n)
Thread(target=lambda: any((p.stdin.write(c) for c in (s[i:i+n] for i in xrange(0, len(s), n)))) or p.stdin.close()).start()
print (p.stdout.read())

Ответ 10

Я искал примерный код для постепенного итерации процесса по мере того, как этот процесс потребляет свой вход от предоставленного итератора (поэтапно также). В основном:

import string
import random

# That what I consider a very useful function, though didn't
# find any existing implementations.
def process_line_reader(args, stdin_lines):
    # args - command to run, same as subprocess.Popen
    # stdin_lines - iterable with lines to send to process stdin
    # returns - iterable with lines received from process stdout
    pass

# Returns iterable over n random strings. n is assumed to be infinity if negative.
# Just an example of function that returns potentially unlimited number of lines.
def random_lines(n, M=8):
    while 0 != n:
        yield "".join(random.choice(string.letters) for _ in range(M))
        if 0 < n:
            n -= 1

# That what I consider to be a very convenient use case for
# function proposed above.
def print_many_uniq_numbered_random_lines():
    i = 0
    for line in process_line_reader(["uniq", "-i"], random_lines(100500 * 9000)):
        # Key idea here is that `process_line_reader` will feed random lines into
        # `uniq` process stdin as lines are consumed from returned iterable.
        print "#%i: %s" % (i, line)
        i += 1

Некоторые предлагаемые здесь решения позволяют делать это с помощью потоков (но это не всегда удобно) или с asyncio (что недоступно в Python 2.x). Ниже приведен пример рабочей реализации, которая позволяет это сделать.

import subprocess
import os
import fcntl
import select

class nonblocking_io(object):
    def __init__(self, f):
        self._fd = -1
        if type(f) is int:
            self._fd = os.dup(f)
            os.close(f)
        elif type(f) is file:
            self._fd = os.dup(f.fileno())
            f.close()
        else:
            raise TypeError("Only accept file objects or interger file descriptors")
        flag = fcntl.fcntl(self._fd, fcntl.F_GETFL)
        fcntl.fcntl(self._fd, fcntl.F_SETFL, flag | os.O_NONBLOCK)
    def __enter__(self):
        return self
    def __exit__(self, type, value, traceback):
        self.close()
        return False
    def fileno(self):
        return self._fd
    def close(self):
        if 0 <= self._fd:
            os.close(self._fd)
            self._fd = -1

class nonblocking_line_writer(nonblocking_io):
    def __init__(self, f, lines, autoclose=True, buffer_size=16*1024, encoding="utf-8", linesep=os.linesep):
        super(nonblocking_line_writer, self).__init__(f)
        self._lines = iter(lines)
        self._lines_ended = False
        self._autoclose = autoclose
        self._buffer_size = buffer_size
        self._buffer_offset = 0
        self._buffer = bytearray()
        self._encoding = encoding
        self._linesep = bytearray(linesep, encoding)
    # Returns False when `lines` iterable is exhausted and all pending data is written
    def continue_writing(self):
        while True:
            if self._buffer_offset < len(self._buffer):
                n = os.write(self._fd, self._buffer[self._buffer_offset:])
                self._buffer_offset += n
                if self._buffer_offset < len(self._buffer):
                    return True
            if self._lines_ended:
                if self._autoclose:
                    self.close()
                return False
            self._buffer[:] = []
            self._buffer_offset = 0
            while len(self._buffer) < self._buffer_size:
                line = next(self._lines, None)
                if line is None:
                    self._lines_ended = True
                    break
                self._buffer.extend(bytearray(line, self._encoding))
                self._buffer.extend(self._linesep)

class nonblocking_line_reader(nonblocking_io):
    def __init__(self, f, autoclose=True, buffer_size=16*1024, encoding="utf-8"):
        super(nonblocking_line_reader, self).__init__(f)
        self._autoclose = autoclose
        self._buffer_size = buffer_size
        self._encoding = encoding
        self._file_ended = False
        self._line_part = ""
    # Returns (lines, more) tuple, where lines is iterable with lines read and more will
    # be set to False after EOF.
    def continue_reading(self):
        lines = []
        while not self._file_ended:
            data = os.read(self._fd, self._buffer_size)
            if 0 == len(data):
                self._file_ended = True
                if self._autoclose:
                    self.close()
                if 0 < len(self._line_part):
                    lines.append(self._line_part.decode(self._encoding))
                    self._line_part = ""
                break
            for line in data.splitlines(True):
                self._line_part += line
                if self._line_part.endswith(("\n", "\r")):
                    lines.append(self._line_part.decode(self._encoding).rstrip("\n\r"))
                    self._line_part = ""
            if len(data) < self._buffer_size:
                break
        return (lines, not self._file_ended)

class process_line_reader(object):
    def __init__(self, args, stdin_lines):
        self._p = subprocess.Popen(args, stdin=subprocess.PIPE, stdout=subprocess.PIPE)
        self._reader = nonblocking_line_reader(self._p.stdout)
        self._writer = nonblocking_line_writer(self._p.stdin, stdin_lines)
        self._iterator = self._communicate()
    def __iter__(self):
        return self._iterator
    def __enter__(self):
        return self._iterator
    def __exit__(self, type, value, traceback):
        self.close()
        return False
    def _communicate(self):
        read_set = [self._reader]
        write_set = [self._writer]
        while read_set or write_set:
            try:
                rlist, wlist, xlist = select.select(read_set, write_set, [])
            except select.error, e:
                if e.args[0] == errno.EINTR:
                    continue
                raise
            if self._reader in rlist:
                stdout_lines, more = self._reader.continue_reading()
                for line in stdout_lines:
                    yield line
                if not more:
                    read_set.remove(self._reader)
            if self._writer in wlist:
                if not self._writer.continue_writing():
                    write_set.remove(self._writer)
        self.close()
    def lines(self):
        return self._iterator
    def close(self):
        if self._iterator is not None:
            self._reader.close()
            self._writer.close()
            self._p.wait()
            self._iterator = None