Подтвердить что ты не робот

В Python, как узнать, когда процесс завершен?

Внутри графического интерфейса Python (PyGTK) я запускаю процесс (используя многопроцессорность). Процесс занимает много времени (~ 20 минут), чтобы закончить. Когда процесс завершен, я хотел бы очистить его (извлечь результаты и присоединиться к процессу). Как узнать, когда процесс завершился?

Мой коллега предложил цикл занятости в родительском процессе, который проверяет, завершился ли дочерний процесс. Конечно, есть лучший способ.

В Unix, когда процесс разветвляется, обработчик сигнала вызывается из родительского процесса, когда дочерний процесс завершен. Но я не вижу ничего подобного в Python. Я что-то пропустил?

Как получается, что конец дочернего процесса можно наблюдать из родительского процесса? (Конечно, я не хочу вызывать Process.join(), поскольку это застыло бы интерфейс GUI.)

Этот вопрос не ограничивается многопроцессорной обработкой: у меня точно такая же проблема с многопоточным.

4b9b3361

Ответ 1

Этот ответ очень прост! (Мне просто потребовалось несколько дней, чтобы разобраться.)

В сочетании с PyGTK idle_add() вы можете создать AutoJoiningThread. Общий код является тривиальным:

class AutoJoiningThread(threading.Thread):
    def run(self):
        threading.Thread.run(self)
        gobject.idle_add(self.join)

Если вы хотите сделать больше, чем просто присоединиться (например, собирать результаты), вы можете расширить вышеуказанный класс, чтобы испускать сигналы при завершении, как это сделано в следующем примере:

import threading
import time
import sys
import gobject
gobject.threads_init()

class Child:
    def __init__(self):
        self.result = None

    def play(self, count):
        print "Child starting to play."
        for i in range(count):
            print "Child playing."
            time.sleep(1)
        print "Child finished playing."
        self.result = 42

    def get_result(self, obj):
        print "The result was "+str(self.result)

class AutoJoiningThread(threading.Thread, gobject.GObject):
    __gsignals__ = {
        'finished': (gobject.SIGNAL_RUN_LAST,
                     gobject.TYPE_NONE,
                     ())
        }

    def __init__(self, *args, **kwargs):
        threading.Thread.__init__(self, *args, **kwargs)
        gobject.GObject.__init__(self)

    def run(self):
        threading.Thread.run(self)
        gobject.idle_add(self.join)
        gobject.idle_add(self.emit, 'finished')

    def join(self):
        threading.Thread.join(self)
        print "Called Thread.join()"

if __name__ == '__main__':
    print "Creating child"
    child = Child()
    print "Creating thread"
    thread = AutoJoiningThread(target=child.play,
                               args=(3,))
    thread.connect('finished', child.get_result)
    print "Starting thread"
    thread.start()
    print "Running mainloop (Ctrl+C to exit)"
    mainloop = gobject.MainLoop()

    try:
        mainloop.run()
    except KeyboardInterrupt:
        print "Received KeyboardInterrupt.  Quiting."
        sys.exit()

    print "God knows how we got here.  Quiting."
    sys.exit()

Результат приведенного выше примера будет зависеть от порядка выполнения потоков, но он будет похож на:

Creating child
Creating thread
Starting thread
Child starting to play.
 Child playing.
Running mainloop (Ctrl+C to exit)
Child playing.
Child playing.
Child finished playing.
Called Thread.join()
The result was 42
^CReceived KeyboardInterrupt.  Quiting.

Невозможно создать AutoJoiningProcess таким же образом (потому что мы не можем вызывать idle_add() для двух разных процессов), однако мы можем использовать AutoJoiningThread, чтобы получить то, что хотим:

class AutoJoiningProcess(multiprocessing.Process):
    def start(self):
        thread = AutoJoiningThread(target=self.start_process)
        thread.start() # automatically joins

    def start_process(self):
        multiprocessing.Process.start(self)
        self.join()

Чтобы продемонстрировать AutoJoiningProcess, вот еще один пример:

import threading
import multiprocessing
import time
import sys
import gobject
gobject.threads_init()

class Child:
    def __init__(self):
        self.result = multiprocessing.Manager().list()

    def play(self, count):
        print "Child starting to play."
        for i in range(count):
            print "Child playing."
            time.sleep(1)
    print "Child finished playing."
        self.result.append(42)

    def get_result(self, obj):
        print "The result was "+str(self.result)

class AutoJoiningThread(threading.Thread, gobject.GObject):
    __gsignals__ = {
        'finished': (gobject.SIGNAL_RUN_LAST,
                     gobject.TYPE_NONE,
                     ())
    }

    def __init__(self, *args, **kwargs):
        threading.Thread.__init__(self, *args, **kwargs)
        gobject.GObject.__init__(self)

    def run(self):
        threading.Thread.run(self)
        gobject.idle_add(self.join)
        gobject.idle_add(self.emit, 'finished')

    def join(self):
        threading.Thread.join(self)
        print "Called Thread.join()"

class AutoJoiningProcess(multiprocessing.Process, gobject.GObject):
    __gsignals__ = {
        'finished': (gobject.SIGNAL_RUN_LAST,
                     gobject.TYPE_NONE,
                     ())
        }

    def __init__(self, *args, **kwargs):
        multiprocessing.Process.__init__(self, *args, **kwargs)
        gobject.GObject.__init__(self)

    def start(self):
        thread = AutoJoiningThread(target=self.start_process)
        thread.start()

    def start_process(self):
        multiprocessing.Process.start(self)
        self.join()
        gobject.idle_add(self.emit, 'finished')

    def join(self):
        multiprocessing.Process.join(self)
        print "Called Process.join()"

if __name__ == '__main__':
    print "Creating child"
    child = Child()
    print "Creating thread"
    process = AutoJoiningProcess(target=child.play,
                               args=(3,))
    process.connect('finished',child.get_result)
    print "Starting thread"
    process.start()
    print "Running mainloop (Ctrl+C to exit)"
    mainloop = gobject.MainLoop()

    try:
        mainloop.run()
    except KeyboardInterrupt:
        print "Received KeyboardInterrupt.  Quiting."
        sys.exit()

    print "God knows how we got here.  Quiting."
    sys.exit()

Результирующий результат будет очень похож на приведенный выше пример, за исключением того, что на этот раз у нас есть как соединение процесса, так и сопутствующий поток:

Creating child
Creating thread
Starting thread
Running mainloop (Ctrl+C to exit)
 Child starting to play.
Child playing.
Child playing.
Child playing.
Child finished playing.
Called Process.join()
The result was [42]
Called Thread.join()
^CReceived KeyboardInterrupt.  Quiting.

К сожалению:

  • Это решение зависит от gobject, из-за использования idle_add(). gobject используется PyGTK.
  • Это не настоящие отношения родитель/ребенок. Если один из этих потоков запущен другим потоком, то он тем не менее будет объединен потоком, выполняющим mainloop, а не родительский поток. Эта проблема справедлива и для AutoJoiningProcess, за исключением того, что я предполагаю, что будет выбрано исключение.

Таким образом, чтобы использовать этот подход, было бы лучше всего создавать потоки/процессы только из mainloop/GUI.

Ответ 2

Я думаю, что в качестве части создания многоплатформенной платформы python простые вещи, такие как SIGCHLD, должны выполняться сами. Согласитесь, это немного больше работы, когда все, что вы хотите сделать, это знать, когда ребенок сделан, но на самом деле это не так больно. Рассмотрим следующее, которое использует дочерний процесс для выполнения работы, два экземпляра multiprocessing.Event и поток для проверки выполнения дочернего процесса:

import threading
from multiprocessing import Process, Event
from time import sleep

def childsPlay(event):
    print "Child started"
    for i in range(3):
        print "Child is playing..."
        sleep(1)
    print "Child done"
    event.set()

def checkChild(event, killEvent):
    event.wait()
    print "Child checked, and is done playing"
    if raw_input("Do again? y/n:") == "y":
        event.clear()
        t = threading.Thread(target=checkChild, args=(event, killEvent))
        t.start()
        p = Process(target=childsPlay, args=(event,))
        p.start()
    else:
        cleanChild()
        killEvent.set()

def cleanChild():
    print "Cleaning up the child..."

if __name__ == '__main__':
    event = Event()
    killEvent = Event()

    # process to do work
    p = Process(target=childsPlay, args=(event,))
    p.start()

    # thread to check on child process
    t = threading.Thread(target=checkChild, args=(event, killEvent))
    t.start()

    try:
        while not killEvent.is_set():
            print "GUI running..."
            sleep(1)
    except KeyboardInterrupt:
        print "Quitting..."
        exit(0)
    finally:
        print "Main done"

ИЗМЕНИТЬ

Соединение со всеми процессами и созданными потоками является хорошей практикой, потому что это поможет указать, когда создаются процессы/потоки зомби (никогда не заканчивающиеся). Я изменил приведенный выше код, создав класс ChildChecker, который наследует от threading.Thread. Единственная цель - начать работу в отдельном процессе, дождаться завершения этого процесса, а затем уведомить GUI, когда все будет завершено. Присоединение к ChildChecker также присоединится к процессу, который он "проверяет". Теперь, если процесс не соединяется через 5 секунд, поток будет принудительно завершать процесс. Ввод "y" создает запуск дочернего процесса с запуском "endlessChildsPlay", который должен демонстрировать завершение работы.

import threading
from multiprocessing import Process, Event
from time import sleep

def childsPlay(event):
    print "Child started"
    for i in range(3):
        print "Child is playing..."
        sleep(1)
    print "Child done"
    event.set()

def endlessChildsPlay(event):
    print "Endless child started"
    while True:
        print "Endless child is playing..."
        sleep(1)
        event.set()
    print "Endless child done"

class ChildChecker(threading.Thread):
    def __init__(self, killEvent):
        super(ChildChecker, self).__init__()
        self.killEvent = killEvent
        self.event = Event()
        self.process = Process(target=childsPlay, args=(self.event,))

    def run(self):
        self.process.start()

        while not self.killEvent.is_set():
            self.event.wait()
            print "Child checked, and is done playing"
            if raw_input("Do again? y/n:") == "y":
                self.event.clear()
                self.process = Process(target=endlessChildsPlay, args=(self.event,))
                self.process.start()
            else:
                self.cleanChild()
                self.killEvent.set()

    def join(self):
        print "Joining child process"
        # Timeout on 5 seconds
        self.process.join(5)

        if self.process.is_alive():
            print "Child did not join!  Killing.."
            self.process.terminate()
        print "Joining ChildChecker thread"
        super(ChildChecker, self).join()


    def cleanChild(self):
        print "Cleaning up the child..."

if __name__ == '__main__':
    killEvent = Event()
    # thread to check on child process
    t = ChildChecker(killEvent)
    t.start()

    try:
        while not killEvent.is_set():
            print "GUI running..."
            sleep(1)
    except KeyboardInterrupt:
        print "Quitting..."
        exit(0)
    finally:
        t.join()
        print "Main done"

Ответ 3

Вы можете использовать очередь для связи с дочерними процессами. Вы можете наложить на него промежуточные результаты или сообщения, указывающие, что контрольные точки были удалены (для индикаторов выполнения) или просто сообщение, указывающее, что процесс готов к объединению. Опрос с помощью empty выполняется легко и быстро.

Если вы действительно хотите узнать, если это будет сделано, вы можете посмотреть exitcode своего процесса или опроса is_alive().

Ответ 4

В моих попытках найти ответ на свой вопрос, я наткнулся на PyGTK idle_add() function. Это дает мне следующую возможность:

  • Создайте новый дочерний процесс, который связывается через очередь.
  • Создайте поток прослушивателя, который прослушивает очередь, когда дочерний процесс отправляет слушателю сообщение о том, что оно завершено, слушатель вызывает idle_add(), который устанавливает обратный вызов.
  • В следующий раз вокруг основного цикла родительский процесс вызовет обратный вызов.
  • Обратный вызов может извлекать результаты, присоединяться к дочернему процессу и присоединяться к потоку-слушателю.

Это кажется чрезмерно сложным способом воссоздать Unix call-callback-when-child-process-is-done.

Это должна быть общая проблема с GUI в Python. Разумеется, существует стандартный шаблон для решения этой проблемы?

Ответ 5

посмотрите модуль подпроцесса:

http://docs.python.org/library/subprocess.html

import subprocess
let pipe = subprocess.Popen("ls -l", stdout=subprocess.PIPE)
allText = pipe.stdout.read()
pipe.wait()
retVal = pipe.returncode