Подтвердить что ты не робот

Python ThreadPoolExecutor - проверяется ли обратный вызов в том же потоке, что и отправленный func?

В ThreadPoolExecutor (TPE), всегда ли обратный вызов работает в том же потоке, что и представленная функция?

Например, я проверил это со следующим кодом. Я запускал его много раз, и казалось, что func и callback всегда работают в одном потоке.

import concurrent.futures 
import random 
import threading 
import time 

executor = concurrent.futures.ThreadPoolExecutor(max_workers=3) 

def func(x): 
    time.sleep(random.random()) 
    return threading.current_thread().name 

def callback(future): 
    time.sleep(random.random()) 
    x = future.result() 
    cur_thread = threading.current_thread().name 
    if (cur_thread != x): 
        print(cur_thread, x) 

print('main thread: %s' % threading.current_thread()) 
for i in range(10000): 
    future = executor.submit(func, i) 
    future.add_done_callback(callback) 

Однако, когда я удалил операторы time.sleep(random.random()), я потерял, по крайней мере, несколько функций func и callbacks не выполнял в том же потоке.

Для проекта, над которым я работаю, обратный вызов должен всегда запускаться в том же потоке, что и переданная функция, поэтому я хотел быть уверенным, что это гарантировано TPE. (А также результаты теста без случайного сна казались озадачивающими).

Я просмотрел исходный код для исполнителей, и, похоже, мы не переключили поток в основной поток, прежде чем запускаем обратный вызов. Но просто хотел быть уверенным.

4b9b3361

Ответ 1

Документация не гарантирует, какие обратные вызовы потока выполняются. Единственная документированная гарантия состоит в том, что обратные вызовы будут выполняться в потоке, принадлежащем процессу, добавившему обратный вызов, но это может быть любой поток, поскольку вы используете ThreadPoolExecutor вместо ProcessPoolExecutor:

Добавленные вызовы вызываются в том порядке, в котором они были добавлены, и всегда вызываются в потоке, принадлежащем процессу, который их добавил.


В текущей реализации ThreadPoolExecutor поток, в котором выполняется обратный вызов, зависит от состояния Future во время добавления обратного вызова и от того, отменено ли Future или нет. Это детали реализации; вам не следует полагаться на них, так как они могут отличаться в разных реализациях Python или в разных версиях, и они могут быть изменены без предварительного уведомления.

Если вы добавите обратный вызов после завершения Future, обратный вызов будет выполняться в любом потоке, в котором вы вызвали add_done_callback. Это можно увидеть, посмотрев источник add_done_callback:

def add_done_callback(self, fn):
    """Attaches a callable that will be called when the future finishes.

    Args:
        fn: A callable that will be called with this future as its only
            argument when the future completes or is cancelled. The callable
            will always be called by a thread in the same process in which
            it was added. If the future has already completed or been
            cancelled then the callable will be called immediately. These
            callables are called in the order that they were added.
    """
    with self._condition:
        if self._state not in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED]:
            self._done_callbacks.append(fn)
            return
    fn(self)

Если состояние Future указывает на то, что оно отменено или завершено, fn просто немедленно вызывается в текущем потоке выполнения. В противном случае он добавляется во внутренний список обратных вызовов для запуска, когда Future завершен.

Например:

>>> def func(*args):
...  time.sleep(5)
...  print("func {}".format(threading.current_thread()))
>>> def cb(a): print("cb {}".format(threading.current_thread()))
... 
>>> fut = ex.submit(func)
>>> func <Thread(Thread-1, started daemon 140084551563008)>
>>> fut = e.add_done_callback(cb)
cb <_MainThread(MainThread, started 140084622018368)>

Если будущее отменяется успешным вызовом cancel, то поток, выполняющий отмену, немедленно вызывает все обратные вызовы:

def cancel(self):
    """Cancel the future if possible.
    Returns True if the future was cancelled, False otherwise. A future
    cannot be cancelled if it is running or has already completed.
    """
    with self._condition:
        if self._state in [RUNNING, FINISHED]:
            return False

        if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
            return True

        self._state = CANCELLED
        self._condition.notify_all()

    self._invoke_callbacks()
    return True

В противном случае обратные вызовы вызываются потоком, который выполняет будущую задачу.