Подтвердить что ты не робот

Блоки - отправка ввода в подпроцесс python

Я тестирую подпроцессы конвейеров с помощью python. Я знаю, что я могу делать то, что программы ниже делают в python напрямую, но это не главное. Я просто хочу проверить конвейер, чтобы я знал, как его использовать.

Моя система - Linux Ubuntu 9.04 с python 2.6 по умолчанию.

Я начал с этого документа .

from subprocess import Popen, PIPE
p1 = Popen(["grep", "-v", "not"], stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
output = p2.communicate()[0]
print output

Это работает, но поскольку p1 stdin не перенаправляется, я должен ввести материал в терминал для подачи канала. Когда я печатаю ^D закрытие stdin, я получаю вывод, который я хочу.

Тем не менее, я хочу отправить данные в канал с помощью строковой переменной python. Сначала я пробовал писать на stdin:

p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
p1.stdin.write('test\n')
output = p2.communicate()[0] # blocks forever here

Не работает. Я попытался использовать p2.stdout.read() вместо этого в последней строке, но также блокирует. Я добавил p1.stdin.flush() и p1.stdin.close(), но это тоже не сработало. Затем я переехал, чтобы сообщить:

p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
p1.communicate('test\n') # blocks forever here
output = p2.communicate()[0] 

Так что это еще не так.

Я заметил, что выполнение одного процесса (например, p1 выше, удаление p2) работает отлично. И работает файл дескриптор p1 (stdin=open(...)). Поэтому проблема заключается в следующем:

Можно ли передавать данные в конвейер из двух или более подпроцессов в python без блокировки? Почему бы и нет?

Я знаю, что могу запустить оболочку и запустить конвейер в оболочке, но это не то, что я хочу.


ОБНОВЛЕНИЕ 1: ниже подсказки Аарона Дигуллы ниже. Теперь я пытаюсь использовать потоки, чтобы заставить его работать.

Сначала я попытался запустить p1.communicate в потоке.

p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
t = threading.Thread(target=p1.communicate, args=('some data\n',))
t.start()
output = p2.communicate()[0] # blocks forever here

Хорошо, не получилось. Попробовал другие комбинации, например, изменить его на .write(), а также p2.read(). Ничего. Теперь попробуйте противоположный подход:

def get_output(subp):
    output = subp.communicate()[0] # blocks on thread
    print 'GOT:', output

p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
t = threading.Thread(target=get_output, args=(p2,)) 
t.start()
p1.communicate('data\n') # blocks here.
t.join()

код где-то блокируется. Либо в порожденной нити, либо в основном потоке, либо в обоих. Так что это не сработало. Если вы знаете, как заставить его работать, это облегчится, если вы сможете предоставить рабочий код. Я пытаюсь здесь.


ОБНОВЛЕНИЕ 2

Paul Du Bois ответил ниже с некоторой информацией, поэтому я сделал больше тестов. Я прочитал весь модуль subprocess.py и понял, как он работает. Поэтому я попытался применить именно это для кода.

Я нахожусь в linux, но, поскольку я тестировал потоки, мой первый подход заключался в том, чтобы реплицировать точный код потокового окна, видимый по методу subprocess.py communicate(), но для двух процессов вместо одного. Здесь весь список того, что я пробовал:

import os
from subprocess import Popen, PIPE
import threading

def get_output(fobj, buffer):
    while True:
        chunk = fobj.read() # BLOCKS HERE
        if not chunk:
            break
        buffer.append(chunk)

p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)

b = [] # create a buffer
t = threading.Thread(target=get_output, args=(p2.stdout, b))
t.start() # start reading thread

for x in xrange(100000):
    p1.stdin.write('hello world\n') # write data
    p1.stdin.flush()
p1.stdin.close() # close input...
t.join()

Ну. Это не сработало. Даже после того, как был вызван p1.stdin.close(), p2.stdout.read() все еще блокируется.

Затем я попробовал код posix на subprocess.py:

import os
from subprocess import Popen, PIPE
import select

p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)

numwrites = 100000
to_read = [p2.stdout]
to_write = [p1.stdin]
b = [] # create buffer

while to_read or to_write:
    read_now, write_now, xlist = select.select(to_read, to_write, [])
    if read_now:
        data = os.read(p2.stdout.fileno(), 1024)
        if not data:
            p2.stdout.close()
            to_read = []
        else:
            b.append(data)

    if write_now:
        if numwrites > 0:
            numwrites -= 1
            p1.stdin.write('hello world!\n'); p1.stdin.flush()
        else:
            p1.stdin.close()
            to_write = []

print b

Также блокируется на select.select(). Распространяя print вокруг, я узнал об этом:

  • Чтение работает. Код читается много раз во время выполнения.
  • Написание также работает. Данные записываются в p1.stdin.
  • В конце numwrites вызывается p1.stdin.close().
  • Когда select() начинает блокировку, только to_read имеет что-то, p2.stdout. to_write уже пуст.
  • os.read() call всегда возвращает что-то, поэтому p2.stdout.close() никогда не вызывается.

Заключение из обоих тестов: закрытие stdin первого процесса на конвейере (grep в этом примере) не заставляет его сбросить свой буферный вывод в следующий и умереть.

Нет способа заставить его работать?

PS: Я не хочу использовать временный файл, я уже тестировал файлы и знаю, что он работает. И я не хочу использовать окна.

4b9b3361

Ответ 1

Я узнал, как это сделать.

Это не о потоках, а не о select().

Когда я запускаю первый процесс (grep), он создает два низкоуровневых дескриптора файла, по одному для каждого канала. Позволяет называть те a и b.

Когда я запускаю второй процесс, b переходит к cut sdtin. Но на Popen - close_fds=False есть мертвый по умолчанию мозг.

Эффект от этого заключается в том, что cut также наследует a. Таким образом, grep не может умереть, даже если я закрываю a, потому что stdin все еще открыт в процессе cut (cut игнорирует его).

Следующий код теперь отлично работает.

from subprocess import Popen, PIPE

p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE, close_fds=True)
p1.stdin.write('Hello World\n')
p1.stdin.close()
result = p2.stdout.read() 
assert result == "Hello Worl\n"

close_fds=True ДОЛЖЕН БЫТЬ ПО УМОЛЧАНИЮ в системах unix. В окнах он закрывает все fds, поэтому он предотвращает создание трубопроводов.

EDIT:

PS: Для людей с аналогичной проблемой, читающих этот ответ: Как сказал pooryorick в комментарии, это также может блокировать, если данные, записанные в p1.stdin, больше, чем буферы. В этом случае вы должны вырезать данные на более мелкие фрагменты и использовать select.select(), чтобы знать, когда читать/писать. Код в вопросе должен дать нам подсказку о том, как реализовать это.

EDIT2: нашел другое решение с большей помощью от pooryorick - вместо использования close_fds=True и закрытия ALL fds можно закрыть fd, принадлежащий первому процессу, при выполнении во-вторых, и он будет работать. Закрытие должно выполняться у ребенка, поэтому функция preexec_fn из Popen очень удобна для этого. При выполнении p2 вы можете:

p2 = Popen(cmd2, stdin=p1.stdout, stdout=PIPE, stderr=devnull, preexec_fn=p1.stdin.close)

Ответ 2

Работа с большими файлами

Два принципа должны применяться единообразно при работе с большими файлами в Python.

  • Поскольку любая процедура IO может блокироваться, мы должны поддерживать каждый этап конвейера в другом потоке или обрабатывать. В этом примере мы используем потоки, но подпроцессы позволят вам избежать GIL.
  • Мы должны использовать инкрементные чтения и записывать, чтобы мы не дождались EOF, прежде чем начинать делать прогресс.

Альтернативой является использование неблокирующего IO, хотя это громоздко в стандартном Python. См. gevent для легкой библиотеки потоков, которая реализует синхронный IO API с помощью неблокирующих примитивов.

Пример кода

Мы построим глупый конвейер, который примерно равен

{cat /usr/share/dict/words} | grep -v not              \
    | {upcase, filtered tee to stderr} | cut -c 1-10   \
    | {translate 'E' to '3'} | grep K | grep Z | {downcase}

где каждый этап в фигурных скобках {} реализуется в Python, а остальные используют стандартные внешние программы. TL; DR: Посмотрите этот метод.

Начнем с ожидаемого импорта.

#!/usr/bin/env python
from subprocess import Popen, PIPE
import sys, threading

Этапы Python конвейера

Все, кроме последнего этапа реализации конвейера, содержащего Python, должны идти в потоке, чтобы он IO не блокировал другие. Они могли вместо этого работать в подпроцессах Python, если вы хотели, чтобы они фактически выполнялись параллельно (избегайте GIL).

def writer(output):
    for line in open('/usr/share/dict/words'):
        output.write(line)
    output.close()
def filter(input, output):
    for line in input:
        if 'k' in line and 'z' in line: # Selective 'tee'
            sys.stderr.write('### ' + line)
        output.write(line.upper())
    output.close()
def leeter(input, output):
    for line in input:
        output.write(line.replace('E', '3'))
    output.close()

Каждый из них должен быть помещен в собственный поток, который мы будем использовать с помощью этой удобной функции.

def spawn(func, **kwargs):
    t = threading.Thread(target=func, kwargs=kwargs)
    t.start()
    return t

Создать конвейер

Создайте внешние этапы с помощью Popen и этапов Python с помощью spawn. Аргумент bufsize=-1 говорит, что использует буферизацию по умолчанию системы (обычно 4 kiB). Это обычно быстрее, чем стандартная (небуферизованная) или буферизация строк, но вам понадобится буферизация строк, если вы хотите визуально контролировать вывод без задержек.

grepv   = Popen(['grep','-v','not'], stdin=PIPE, stdout=PIPE, bufsize=-1)
cut     = Popen(['cut','-c','1-10'], stdin=PIPE, stdout=PIPE, bufsize=-1)
grepk = Popen(['grep', 'K'], stdin=PIPE, stdout=PIPE, bufsize=-1)
grepz = Popen(['grep', 'Z'], stdin=grepk.stdout, stdout=PIPE, bufsize=-1)

twriter = spawn(writer, output=grepv.stdin)
tfilter = spawn(filter, input=grepv.stdout, output=cut.stdin)
tleeter = spawn(leeter, input=cut.stdout, output=grepk.stdin)

Привод конвейера

Собранный, как указано выше, все буферы в конвейере будут заполняться, но поскольку никто не читает с конца (grepz.stdout), все они будут блокироваться. Мы могли бы прочитать все это в одном вызове grepz.stdout.read(), но это использовало бы большую память для больших файлов. Вместо этого мы читаем инкрементно.

for line in grepz.stdout:
    sys.stdout.write(line.lower())

Потоки и процессы очищаются, как только они достигают EOF. Мы можем явно очистить, используя

for t in [twriter, tfilter, tleeter]: t.join()
for p in [grepv, cut, grepk, grepz]: p.wait()

Python-2.6 и ранее

Внутренне, subprocess.Popen вызывает fork, настраивает дескрипторы файла pipe и вызывает exec. В дочернем процессе из fork есть копии всех дескрипторов файлов в родительском процессе, и обе копии должны быть закрыты до того, как соответствующий читатель получит EOF. Это можно устранить путем ручного закрытия труб (либо close_fds=True или подходящего аргумента preexec_fn до subprocess.Popen), либо установив флаг FD_CLOEXEC, чтобы иметь exec автоматически закрыть дескриптор файла. Этот флаг устанавливается автоматически в Python-2.7 и более поздних версиях, см. issue12786. Мы можем получить поведение Python-2.7 в более ранних версиях Python, вызвав

p._set_cloexec_flags(p.stdin)

перед передачей p.stdin в качестве аргумента для последующего subprocess.Popen.

Ответ 3

Существует три основных способа заставить трубы работать как ожидалось

  • Убедитесь, что каждый конец трубы используется в другом потоке/процессе (некоторые из примеров в верхней части страдают от этой проблемы).

  • явно закрыть неиспользуемый конец канала в каждом процессе

  • обрабатывать буферизацию, отключив его (параметр Python -u), используя pty или просто заполнить буфер тем, что не повлияет на data (возможно, "\n", но все, что подходит).

Примеры в модуле "конвейер" Python (я автор) соответствуют вашему сценарию точно, и сделать шаги низкого уровня достаточно ясными.

http://pypi.python.org/pypi/pipeline/

Совсем недавно я использовал модуль подпроцесса как часть шаблон производителя-процессора-потребителя-регулятора:

http://www.darkarchive.org/w/Pub/PythonInteract

Этот пример относится к буферизированному stdin, не прибегая к использованию pty, и также показывает, какие концы труб должны быть закрыты, где. Я предпочитаю процессы для нить, но принцип тот же. Кроме того, это иллюстрирует синхронизирующие очереди, на которые подают производителя и собирают информацию от потребителя, и как закрыть их чисто (обратите внимание на часовики, вставленные в Очереди). Этот шаблон позволяет создавать новые входные данные на основе последних результатов, позволяя рекурсивное обнаружение и обработку.

Ответ 4

Предлагаемое решение Nosklo быстро сломается, если на приемный конец трубы будет записано слишком много данных:


from subprocess import Popen, PIPE

p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE, close_fds=True)
p1.stdin.write('Hello World\n' * 20000)
p1.stdin.close()
result = p2.stdout.read() 
assert result == "Hello Worl\n"

Если этот script не зависает на вашем компьютере, просто увеличьте "20000" на то, что превышает размер буферов для вашей операционной системы.

Это связано с тем, что операционная система выполняет буферизацию ввода на "grep", но как только этот буфер заполнен, вызов p1.stdin.write будет блокироваться до тех пор, пока что-то не прочитает из p2.stdout. В игрушечных сценариях вы можете найти способ записи/чтения из трубы в том же процессе, но при нормальном использовании необходимо писать из одного потока/процесса и читать из отдельного потока/процесса. Это верно для subprocess.popen, os.pipe, os.popen * и т.д.

Другим поворотным моментом является то, что иногда вы хотите продолжать кормить трубу элементами, полученными из более раннего выхода того же самого канала. Решение состоит в том, чтобы сделать как податчик труб, так и считыватель труб асинхронным по отношению к мужской программе и реализовать две очереди: одну между основной программой и податчиком трубопровода и одну между основной программой и считывателем каналов. PythonInteract является примером этого.

Подпроцесс - хорошая модель удобства, но поскольку он скрывает детали вызовов os.popen и os.fork, которые он делает под капотом, иногда бывает сложнее иметь дело с более низкими вызовами, которые он использует. По этой причине подпроцесс не является хорошим способом узнать, как действительно работают межпроцессные каналы.

Ответ 5

Вы должны сделать это в нескольких потоках. В противном случае вы окажетесь в ситуации, когда вы не можете отправить данные: child p1 не будет читать ваш вход, так как p2 не считывает вывод p1, потому что вы не читаете вывод p2.

Итак, вам нужен фоновый поток, который читает, что записывает p2. Это позволит p2 продолжить работу после записи некоторых данных в канал, поэтому он может прочитать следующую строку ввода из p1, которая снова позволяет p1 обрабатывать данные, которые вы отправляете на него.

В качестве альтернативы вы можете отправить данные на p1 с фоновым потоком и прочитать вывод из p2 в основном потоке. Но обе стороны должны быть потоком.

Ответ 6

Отвечая на утверждение nosklo (см. другие комментарии к этому вопросу), что это невозможно сделать без close_fds=True:

close_fds=True необходим только в том случае, если вы оставили другой файл дескрипторы открыты. При открытии нескольких дочерних процессов всегда полезно отслеживать открытые файлы, которые могут наследоваться, и явно закрывать любые которые не нужны:

from subprocess import Popen, PIPE

p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p1.stdin.write('Hello World\n')
p1.stdin.close()
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
result = p2.stdout.read() 
assert result == "Hello Worl\n"

close_fds по умолчанию False, потому что подпроцесс предпочитает доверять вызывающей программе, чтобы знать, что она делает с открытым файлом дескрипторы и просто предоставить вызывающему абоненту простой вариант закрыть их все если это то, что он хочет сделать.

Но реальная проблема заключается в том, что буферы для труб будут кусать вас для всех, кроме игрушечных примеров. Как я уже сказал в своих других ответах на этот вопрос, эмпирическое правило не открывайте ваш читатель и ваш писатель в том же процессе/потоке. Кто угодно кто хочет использовать модуль подпроцесса для двусторонней связи, будет хорошо служил для изучения os.pipe и os.fork, во-первых. На самом деле это не так трудно использовать, если у вас есть хороший пример, чтобы посмотреть.

Ответ 7

Я думаю, вы можете исследовать неправильную проблему. Конечно, как говорит Аарон, если вы попытаетесь стать как продюсером до начала трубопровода, так и потребителем конца конвейера, легко попасть в тупик. Это проблема, с которой решает связь().

общаться() не совсем корректно для вас, поскольку stdin и stdout находятся на разных объектах подпроцесса; но если вы посмотрите на реализацию в subprocess.py, вы увидите, что он делает именно то, что предложил Аарон.

Как только вы увидите, что общаетесь как с чтением, так и с записью, вы увидите, что во второй попытке связь() конкурирует с p2 для вывода p1:

p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
# ...
p1.communicate('data\n')       # reads from p1.stdout, as does p2

Я запускаюсь на win32, который определенно имеет разные характеристики ввода/вывода и буферизации, но это работает для меня:

p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
t = threading.Thread(target=get_output, args=(p2,)) 
t.start()
p1.stdin.write('hello world\n' * 100000)
p1.stdin.close()
t.join()

Я настроил размер ввода, чтобы создать тупик при использовании наивного unthreaded p2.read()

Вы также можете попробовать выполнить буферизацию в файл, например

fd, _ = tempfile.mkstemp()
os.write(fd, 'hello world\r\n' * 100000)
os.lseek(fd, 0, os.SEEK_SET)
p1 = Popen(["grep", "-v", "not"], stdin=fd, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
print p2.stdout.read()

Это также работает для меня без взаимоблокировок.

Ответ 8

В одном из комментариев выше я бросил вызов nosklo, чтобы либо опубликовать некоторый код, чтобы поддержать его утверждения о select.select, либо перенести мои ответы, которые он ранее проголосовал. Он ответил следующим кодом:

from subprocess import Popen, PIPE
import select

p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE, close_fds=True)

data_to_write = 100000 * 'hello world\n'
to_read = [p2.stdout]
to_write = [p1.stdin]
b = [] # create buffer
written = 0


while to_read or to_write:
    read_now, write_now, xlist = select.select(to_read, to_write, [])
    if read_now:
        data = p2.stdout.read(1024)
        if not data:
            p2.stdout.close()
            to_read = []
        else:
            b.append(data)

    if write_now:
        if written < len(data_to_write):
            part = data_to_write[written:written+1024]
            written += len(part)
            p1.stdin.write(part); p1.stdin.flush()
        else:
            p1.stdin.close()
            to_write = []

print b

Одной из проблем с этим script является то, что он второй догадывает размер/характер системных буферов. script будет испытывать меньше сбоев, если он сможет удалить магические числа, подобные 1024.

Большая проблема заключается в том, что этот script код работает последовательно с правильным сочетание ввода данных и внешних программ. grep и разрезать обе работы с линий, и поэтому их внутренние буферы ведут себя по-другому. Если мы используем более общую команду, такую ​​как "cat", и записывать меньшие биты данных в трубу, состояние фатальной гонки будет появляться чаще:

from subprocess import Popen, PIPE
import select
import time

p1 = Popen(["cat"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cat"], stdin=p1.stdout, stdout=PIPE, close_fds=True)

data_to_write = 'hello world\n'
to_read = [p2.stdout]
to_write = [p1.stdin]
b = [] # create buffer
written = 0


while to_read or to_write:
    time.sleep(1)
    read_now, write_now, xlist = select.select(to_read, to_write, [])
    if read_now:
        print 'I am reading now!'
        data = p2.stdout.read(1024)
        if not data:
            p1.stdout.close()
            to_read = []
        else:
            b.append(data)

    if write_now:
        print 'I am writing now!'
        if written < len(data_to_write):
            part = data_to_write[written:written+1024]
            written += len(part)
            p1.stdin.write(part); p1.stdin.flush()
        else:
            print 'closing file'
            p1.stdin.close()
            to_write = []

print b

В этом случае проявятся два разных результата:

write, write, close file, read -> success
write, read -> hang

Итак, снова я оспариваю носкло либо почтовому индексу, показывающим использование select.select для обработки произвольного ввода и буферизации каналов из один поток, или чтобы повысить мои ответы.

Нижняя строка: не пытайтесь манипулировать обоими концами трубы из одного потока. Это просто не стоит. Видеть pipeline для хорошего низкоуровневого пример того, как это сделать правильно.

Ответ 9

Как использовать SpooledTemporaryFile? Это обходит (но, возможно, не решает) проблему:

http://docs.python.org/library/tempfile.html#tempfile.SpooledTemporaryFile

Вы можете написать ему как файл, но на самом деле это блок памяти.

Или я полностью недопонимаю...

Ответ 10

Вот пример использования Popen вместе с os.fork для выполнения того же вещь. Вместо использования close_fds он просто закрывает трубы на правильные места. Намного проще, чем пытаться использовать select.select, и использует все преимущества буферов системных труб.

from subprocess import Popen, PIPE
import os
import sys

p1 = Popen(["cat"], stdin=PIPE, stdout=PIPE)

pid = os.fork()

if pid: #parent
    p1.stdin.close()
    p2 = Popen(["cat"], stdin=p1.stdout, stdout=PIPE)
    data = p2.stdout.read()
    sys.stdout.write(data)
    p2.stdout.close()

else: #child
    data_to_write = 'hello world\n' * 100000
    p1.stdin.write(data_to_write)
    p1.stdin.close()

Ответ 11

Это намного проще, чем вы думаете!

import sys
from subprocess import Popen, PIPE

# Pipe the command here. It will read from stdin.
#   So cat a file, to stdin, like (cat myfile | ./this.py),
#     or type on terminal and hit control+d when done, etc
#   No need to handle this yourself, that why we have shell's!
p = Popen("grep -v not | cut -c 1-10", shell=True, stdout=PIPE)

nextData = None
while True:
    nextData = p.stdout.read()
    if nextData in (b'', ''):
        break
    sys.stdout.write ( nextData.decode('utf-8') )


p.wait()

Этот код написан для python 3.6 и работает с python 2.7.

Используйте его как:

cat README.md  | python ./example.py

или

python example.py < README.md

Чтобы передать содержимое "README.md" в эту программу.

Но.. на данный момент, почему бы просто не использовать "кошку" напрямую, а вывести на выходе, как хотите? как:

cat filename | grep -v not | cut -c 1-10

введенный в консоль, также выполнит эту работу. Я лично использовал бы только вариант кода, если бы я обрабатывал результат, иначе оболочку script было бы легче поддерживать и сохранять.

Вы просто используете оболочку, чтобы сделать для вас трубопровод. В одном, другой. Это то, что она будет GREAT при выполнении, управление процессами и управление цепочками ввода и вывода одной ширины. Некоторые назвали бы это лучшей неинтерактивной функцией оболочки.