Я тестирую подпроцессы конвейеров с помощью python. Я знаю, что я могу делать то, что программы ниже делают в python напрямую, но это не главное. Я просто хочу проверить конвейер, чтобы я знал, как его использовать.
Моя система - Linux Ubuntu 9.04 с python 2.6 по умолчанию.
Я начал с этого документа .
from subprocess import Popen, PIPE
p1 = Popen(["grep", "-v", "not"], stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
output = p2.communicate()[0]
print output
Это работает, но поскольку p1
stdin
не перенаправляется, я должен ввести материал в терминал для подачи канала. Когда я печатаю ^D
закрытие stdin, я получаю вывод, который я хочу.
Тем не менее, я хочу отправить данные в канал с помощью строковой переменной python. Сначала я пробовал писать на stdin:
p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
p1.stdin.write('test\n')
output = p2.communicate()[0] # blocks forever here
Не работает. Я попытался использовать p2.stdout.read()
вместо этого в последней строке, но также блокирует. Я добавил p1.stdin.flush()
и p1.stdin.close()
, но это тоже не сработало. Затем я переехал, чтобы сообщить:
p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
p1.communicate('test\n') # blocks forever here
output = p2.communicate()[0]
Так что это еще не так.
Я заметил, что выполнение одного процесса (например, p1
выше, удаление p2
) работает отлично. И работает файл дескриптор p1
(stdin=open(...)
). Поэтому проблема заключается в следующем:
Можно ли передавать данные в конвейер из двух или более подпроцессов в python без блокировки? Почему бы и нет?
Я знаю, что могу запустить оболочку и запустить конвейер в оболочке, но это не то, что я хочу.
ОБНОВЛЕНИЕ 1: ниже подсказки Аарона Дигуллы ниже. Теперь я пытаюсь использовать потоки, чтобы заставить его работать.
Сначала я попытался запустить p1.communicate в потоке.
p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
t = threading.Thread(target=p1.communicate, args=('some data\n',))
t.start()
output = p2.communicate()[0] # blocks forever here
Хорошо, не получилось. Попробовал другие комбинации, например, изменить его на .write()
, а также p2.read()
. Ничего. Теперь попробуйте противоположный подход:
def get_output(subp):
output = subp.communicate()[0] # blocks on thread
print 'GOT:', output
p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
t = threading.Thread(target=get_output, args=(p2,))
t.start()
p1.communicate('data\n') # blocks here.
t.join()
код где-то блокируется. Либо в порожденной нити, либо в основном потоке, либо в обоих. Так что это не сработало. Если вы знаете, как заставить его работать, это облегчится, если вы сможете предоставить рабочий код. Я пытаюсь здесь.
ОБНОВЛЕНИЕ 2
Paul Du Bois ответил ниже с некоторой информацией, поэтому я сделал больше тестов.
Я прочитал весь модуль subprocess.py
и понял, как он работает. Поэтому я попытался применить именно это для кода.
Я нахожусь в linux, но, поскольку я тестировал потоки, мой первый подход заключался в том, чтобы реплицировать точный код потокового окна, видимый по методу subprocess.py
communicate()
, но для двух процессов вместо одного. Здесь весь список того, что я пробовал:
import os
from subprocess import Popen, PIPE
import threading
def get_output(fobj, buffer):
while True:
chunk = fobj.read() # BLOCKS HERE
if not chunk:
break
buffer.append(chunk)
p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
b = [] # create a buffer
t = threading.Thread(target=get_output, args=(p2.stdout, b))
t.start() # start reading thread
for x in xrange(100000):
p1.stdin.write('hello world\n') # write data
p1.stdin.flush()
p1.stdin.close() # close input...
t.join()
Ну. Это не сработало. Даже после того, как был вызван p1.stdin.close()
, p2.stdout.read()
все еще блокируется.
Затем я попробовал код posix на subprocess.py
:
import os
from subprocess import Popen, PIPE
import select
p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
numwrites = 100000
to_read = [p2.stdout]
to_write = [p1.stdin]
b = [] # create buffer
while to_read or to_write:
read_now, write_now, xlist = select.select(to_read, to_write, [])
if read_now:
data = os.read(p2.stdout.fileno(), 1024)
if not data:
p2.stdout.close()
to_read = []
else:
b.append(data)
if write_now:
if numwrites > 0:
numwrites -= 1
p1.stdin.write('hello world!\n'); p1.stdin.flush()
else:
p1.stdin.close()
to_write = []
print b
Также блокируется на select.select()
. Распространяя print
вокруг, я узнал об этом:
- Чтение работает. Код читается много раз во время выполнения.
- Написание также работает. Данные записываются в
p1.stdin
. - В конце
numwrites
вызываетсяp1.stdin.close()
. - Когда
select()
начинает блокировку, толькоto_read
имеет что-то,p2.stdout
.to_write
уже пуст. -
os.read()
call всегда возвращает что-то, поэтомуp2.stdout.close()
никогда не вызывается.
Заключение из обоих тестов: закрытие stdin
первого процесса на конвейере (grep
в этом примере) не заставляет его сбросить свой буферный вывод в следующий и умереть.
Нет способа заставить его работать?
PS: Я не хочу использовать временный файл, я уже тестировал файлы и знаю, что он работает. И я не хочу использовать окна.