Подтвердить что ты не робот

Tkinter: Как использовать потоки для предотвращения цикла основного события от "замораживания"

У меня есть небольшой тест GUI с кнопкой "Пуск" и панель "Прогресс". Желаемое поведение:

  • Нажмите "Пуск"
  • Progressbar колеблется в течение 5 секунд
  • Индикатор Progressbar останавливается

Наблюдаемое поведение: кнопка "Старт" замерзает в течение 5 секунд, затем отображается индикатор прогресса (без колебаний).

Вот мой код:

class GUI:
    def __init__(self, master):
        self.master = master
        self.test_button = Button(self.master, command=self.tb_click)
        self.test_button.configure(
            text="Start", background="Grey",
            padx=50
            )
        self.test_button.pack(side=TOP)

    def progress(self):
        self.prog_bar = ttk.Progressbar(
            self.master, orient="horizontal",
            length=200, mode="indeterminate"
            )
        self.prog_bar.pack(side=TOP)

    def tb_click(self):
        self.progress()
        self.prog_bar.start()
        # Simulate long running process
        t = threading.Thread(target=time.sleep, args=(5,))
        t.start()
        t.join()
        self.prog_bar.stop()

root = Tk()
root.title("Test Button")
main_ui = GUI(root)
root.mainloop()

Основываясь на информации от Bryan Oakley здесь, я понимаю, что мне нужно использовать потоки. Я попытался создать поток, но я предполагаю, что, поскольку поток запускается из основного потока, это не помогает.

У меня возникла идея разместить логическую часть в другом классе и создать экземпляр GUI из этого класса, аналогичный примеру кода A. Rodas здесь.

Мой вопрос:

Я не могу понять, как закодировать его так, чтобы эта команда:

self.test_button = Button(self.master, command=self.tb_click)

вызывает функцию, расположенную в другом классе. Это плохое дело, или это даже возможно? Как создать 2-й класс, который может обрабатывать self.tb_click? Я пробовал следовать примеру кода А. Родаса, который прекрасно работает. Но я не могу понять, как реализовать его решение в случае виджета Button, который запускает действие.

Если я должен обработать поток из одного класса GUI, как создать поток, который не мешает основному потоку?

4b9b3361

Ответ 1

Когда вы присоединитесь к новому потоку в основном потоке, он будет ждать окончания потока, поэтому графический интерфейс будет блокироваться, даже если вы используете многопоточность.

Если вы хотите поместить логическую часть в другой класс, вы можете напрямую подчинить Thread, а затем запустить новый объект этого класса при нажатии кнопки. Конструктор этого подкласса Thread может получить объект Queue, а затем вы сможете передать его с частью GUI. Поэтому мое предложение:

  • Создайте объект Queue в основном потоке
  • Создайте новый поток с доступом к этой очереди
  • Периодически проверяйте очередь в основном потоке

Затем вам нужно решить проблему, что произойдет, если пользователь нажмет на два раза одну и ту же кнопку (она будет порождать новый поток с каждым щелчком), но вы можете исправить это, отключив кнопку запуска и включив ее снова после того, как вы вызов self.prog_bar.stop().

import Queue

class GUI:
    # ...

    def tb_click(self):
        self.progress()
        self.prog_bar.start()
        self.queue = Queue.Queue()
        ThreadedTask(self.queue).start()
        self.master.after(100, self.process_queue)

    def process_queue(self):
        try:
            msg = self.queue.get(0)
            # Show result of the task if needed
            self.prog_bar.stop()
        except Queue.Empty:
            self.master.after(100, self.process_queue)

class ThreadedTask(threading.Thread):
    def __init__(self, queue):
        threading.Thread.__init__(self)
        self.queue = queue
    def run(self):
        time.sleep(5)  # Simulate long running process
        self.queue.put("Task finished")

Ответ 3

Я отправлю основу для альтернативного решения. Это не является специфическим для индикатора прогресса Tk как такового, но это, безусловно, может быть реализовано очень легко для этого.

Вот несколько классов, которые позволяют запускать другие задачи на фоне Tk, при необходимости обновлять элементы управления Tk, а не блокировать gui!

Здесь класс TkRepeatingTask и BackgroundTask:

import threading

class TkRepeatingTask():

    def __init__( self, tkRoot, taskFuncPointer, freqencyMillis ):
        self.__tk_   = tkRoot
        self.__func_ = taskFuncPointer        
        self.__freq_ = freqencyMillis
        self.__isRunning_ = False

    def isRunning( self ) : return self.__isRunning_ 

    def start( self ) : 
        self.__isRunning_ = True
        self.__onTimer()

    def stop( self ) : self.__isRunning_ = False

    def __onTimer( self ): 
        if self.__isRunning_ :
            self.__func_() 
            self.__tk_.after( self.__freq_, self.__onTimer )

class BackgroundTask():

    def __init__( self, taskFuncPointer ):
        self.__taskFuncPointer_ = taskFuncPointer
        self.__workerThread_ = None
        self.__isRunning_ = False

    def taskFuncPointer( self ) : return self.__taskFuncPointer_

    def isRunning( self ) : 
        return self.__isRunning_ and self.__workerThread_.isAlive()

    def start( self ): 
        if not self.__isRunning_ :
            self.__isRunning_ = True
            self.__workerThread_ = self.WorkerThread( self )
            self.__workerThread_.start()

    def stop( self ) : self.__isRunning_ = False

    class WorkerThread( threading.Thread ):
        def __init__( self, bgTask ):      
            threading.Thread.__init__( self )
            self.__bgTask_ = bgTask

        def run( self ):
            try :
                self.__bgTask_.taskFuncPointer()( self.__bgTask_.isRunning )
            except Exception as e: print repr(e)
            self.__bgTask_.stop()

Вот тест Tk, который демонстрирует их использование. Просто добавьте это в нижнюю часть модуля с этими классами, если вы хотите увидеть демонстрацию в действии:

def tkThreadingTest():

    from tkinter import Tk, Label, Button, StringVar
    from time import sleep

    class UnitTestGUI:

        def __init__( self, master ):
            self.master = master
            master.title( "Threading Test" )

            self.testButton = Button( 
                self.master, text="Blocking", command=self.myLongProcess )
            self.testButton.pack()

            self.threadedButton = Button( 
                self.master, text="Threaded", command=self.onThreadedClicked )
            self.threadedButton.pack()

            self.cancelButton = Button( 
                self.master, text="Stop", command=self.onStopClicked )
            self.cancelButton.pack()

            self.statusLabelVar = StringVar()
            self.statusLabel = Label( master, textvariable=self.statusLabelVar )
            self.statusLabel.pack()

            self.clickMeButton = Button( 
                self.master, text="Click Me", command=self.onClickMeClicked )
            self.clickMeButton.pack()

            self.clickCountLabelVar = StringVar()            
            self.clickCountLabel = Label( master,  textvariable=self.clickCountLabelVar )
            self.clickCountLabel.pack()

            self.threadedButton = Button( 
                self.master, text="Timer", command=self.onTimerClicked )
            self.threadedButton.pack()

            self.timerCountLabelVar = StringVar()            
            self.timerCountLabel = Label( master,  textvariable=self.timerCountLabelVar )
            self.timerCountLabel.pack()

            self.timerCounter_=0

            self.clickCounter_=0

            self.bgTask = BackgroundTask( self.myLongProcess )

            self.timer = TkRepeatingTask( self.master, self.onTimer, 1 )

        def close( self ) :
            print "close"
            try: self.bgTask.stop()
            except: pass
            try: self.timer.stop()
            except: pass            
            self.master.quit()

        def onThreadedClicked( self ):
            print "onThreadedClicked"
            try: self.bgTask.start()
            except: pass

        def onTimerClicked( self ) :
            print "onTimerClicked"
            self.timer.start()

        def onStopClicked( self ) :
            print "onStopClicked"
            try: self.bgTask.stop()
            except: pass
            try: self.timer.stop()
            except: pass                        

        def onClickMeClicked( self ):
            print "onClickMeClicked"
            self.clickCounter_+=1
            self.clickCountLabelVar.set( str(self.clickCounter_) )

        def onTimer( self ) :
            print "onTimer"
            self.timerCounter_+=1
            self.timerCountLabelVar.set( str(self.timerCounter_) )

        def myLongProcess( self, isRunningFunc=None ) :
            print "starting myLongProcess"
            for i in range( 1, 10 ):
                try:
                    if not isRunningFunc() :
                        self.onMyLongProcessUpdate( "Stopped!" )
                        return
                except : pass   
                self.onMyLongProcessUpdate( i )
                sleep( 1.5 ) # simulate doing work
            self.onMyLongProcessUpdate( "Done!" )                

        def onMyLongProcessUpdate( self, status ) :
            print "Process Update: %s" % (status,)
            self.statusLabelVar.set( str(status) )

    root = Tk()    
    gui = UnitTestGUI( root )
    root.protocol( "WM_DELETE_WINDOW", gui.close )
    root.mainloop()

if __name__ == "__main__": 
    tkThreadingTest()

Два пункта импорта. Я хочу подчеркнуть, что BackgroundTask:

1) Функция, которую вы запускаете в фоновой задаче, должна принимать указатель на функцию, который он будет вызывать и уважать, что позволяет отменить задачу в середине пути - если это возможно.

2) Вы должны убедиться, что фоновая задача остановлена ​​при выходе из приложения. Этот поток все равно будет работать, даже если ваш gui будет закрыт, если вы не решите это!