Подтвердить что ты не робот

Мимизация glib.spawn_async с помощью Popen...

Функция glib.spawn_async позволяет подключить три обратных вызова, вызываемых в событии на stdout, stderr, и в процессе завершение.

Как я могу имитировать те же функции с помощью subprocess с нитями или асинхронными вызовами?

Меня больше интересуют функциональность, а не threading/asynio, но ответ, который содержит оба, будет зарабатывать щедрость.

Вот игрушечная программа, которая показывает, что я хочу сделать:

import glib
import logging
import os
import gtk


class MySpawn(object):
    def __init__(self):
        self._logger = logging.getLogger(self.__class__.__name__)

    def execute(self, cmd, on_done, on_stdout, on_stderr):
        self.pid, self.idin, self.idout, self.iderr = \
            glib.spawn_async(cmd,
                             flags=glib.SPAWN_DO_NOT_REAP_CHILD,
                             standard_output=True,
                             standard_error=True)
        fout = os.fdopen(self.idout, "r")
        ferr = os.fdopen(self.iderr, "r")
        glib.child_watch_add(self.pid, on_done)
        glib.io_add_watch(fout, glib.IO_IN, on_stdout)
        glib.io_add_watch(ferr, glib.IO_IN, on_stderr)
        return self.pid


if __name__ == '__main__':
    logging.basicConfig(format='%(thread)d %(levelname)s:  %(message)s',
                        level=logging.DEBUG)
    cmd = '/usr/bin/git ls-remote https://github.com/DiffSK/configobj'.split()

    def on_done(pid, retval, *args):
        logging.info("That all folks!…")

    def on_stdout(fobj, cond):
        """This blocks which is fine for this toy example…"""
        for line in fobj.readlines():
            logging.info(line.strip())
        return True

    def on_stderr(fobj, cond):
        """This blocks which is fine for this toy example…"""
        for line in fobj.readlines():
            logging.error(line.strip())
        return True

    runner = MySpawn()
    runner.execute(cmd, on_done, on_stdout, on_stderr)
    try:
        gtk.main()
    except KeyboardInterrupt:
        print('')

Я должен добавить, что, поскольку readlines() блокирует, вышесказанное буферизует весь вывод и отправляет его сразу. Если это не то, что нужно, тогда вы должны использовать readline() и убедитесь, что по окончании команды вы закончите чтение всех строк, которые вы ранее не читали.

4b9b3361

Ответ 1

asyncio имеет subprocess_exec, нет необходимости использовать модуль подпроцесса вообще:

import asyncio

class Handler(asyncio.SubprocessProtocol):
    def pipe_data_received(self, fd, data):
        # fd == 1 for stdout, and 2 for stderr
        print("Data from /bin/ls on fd %d: %s" % (fd, data.decode()))

    def pipe_connection_lost(self, fd, exc):
        print("Connection lost to /bin/ls")

    def process_exited(self):
        print("/bin/ls is finished.")

loop = asyncio.get_event_loop()
coro = loop.subprocess_exec(Handler, "/bin/ls", "/")

loop.run_until_complete(coro)
loop.close()

С подпроцессом и потоками это просто. Вы можете просто создать поток для каждого потока, а один - wait() для процесса:

import subprocess
import threading

class PopenWrapper(object):
    def __init__(self, args):
       self.process = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE, stdin=subprocess.DEVNULL)

       self.stdout_reader_thread = threading.Thread(target=self._reader, args=(self.process.stdout,))
       self.stderr_reader_thread = threading.Thread(target=self._reader, args=(self.process.stderr,))
       self.exit_watcher = threading.Thread(target=self._exit_watcher)

       self.stdout_reader_thread.start()
       self.stderr_reader_thread.start()
       self.exit_watcher.start()

    def _reader(self, fileobj):
        for line in fileobj:
            self.on_data(fileobj, line)

    def _exit_watcher(self):
        self.process.wait()
        self.stdout_reader_thread.join()
        self.stderr_reader_thread.join()
        self.on_exit()

    def on_data(self, fd, data):
        return NotImplementedError

    def on_exit(self):
        return NotImplementedError

    def join(self):
        self.process.wait()

class LsWrapper(PopenWrapper):
    def on_data(self, fd, data):
        print("Received on fd %r: %s" % (fd, data))

    def on_exit(self):
        print("Process exited.")


LsWrapper(["/bin/ls", "/"]).join()

Однако помните, что glib не использует потоки для асинхронного выполнения ваших обратных вызовов. Он использует цикл событий, как это делает asyncio. Идея состоит в том, что в основе вашей программы лежит цикл, который ждет, пока что-то не произойдет, а затем синхронно выполняет связанный обратный вызов. В вашем случае эти "данные становятся доступными для чтения на одной из труб" и "подпроцесс вышел". В общем, это также такие вещи, как "X11-сервер, сообщаемое движение мыши", "входящий сетевой трафик" и т.д. Вы можете эмулировать поведение glib, написав собственный цикл событий. Используйте select module на двух трубах. Если select сообщает, что каналы доступны для чтения, но read не возвращает никаких данных, процесс, скорее всего, вышел из него - вызовите метод poll() для объекта подпроцесса в этом случае, чтобы проверить, завершено ли оно, и вызовите обратный вызов exit, если он, или обратный вызов ошибки также.