Подтвердить что ты не робот

Как отличается ThreadPoolExecutor(). Map отличается от ThreadPoolExecutor()? Submit?

Меня просто смутил какой-то код, который я написал. Я с удивлением обнаружил, что:

with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
    results = list(executor.map(f, iterable))

и

with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
    results = list(map(lambda x: executor.submit(f, x), iterable))

дают разные результаты. Первый создает список любого типа f, второй создает список объектов concurrent.futures.Future, которые затем должны оцениваться с помощью метода result(), чтобы получить возвращаемое значение f.

Моя основная проблема заключается в том, что это означает, что executor.map не может использовать concurrent.futures.as_completed, что представляется чрезвычайно удобным способом оценки результатов некоторых длительных вызовов базы данных, которую я создаю как они становятся доступными.

Я не совсем понимаю, как работают объекты concurrent.futures.ThreadPoolExecutor - наивно, я бы предпочел (несколько более подробный):

with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
    result_futures = list(map(lambda x: executor.submit(f, x), iterable))
    results = [f.result() for f in futures.as_completed(result_futures)]

более кратким executor.map, чтобы использовать возможное усиление производительности. Неужели я ошибаюсь?

4b9b3361

Ответ 1

Проблема заключается в том, что вы преобразовываете результат ThreadPoolExecutor.map в список. Если вы этого не сделаете и вместо этого перейдете по результирующему генератору, результаты все равно будут получены в правильном порядке, но цикл будет продолжаться до того, как все результаты будут готовы. Вы можете проверить это в этом примере:

import time
import concurrent.futures

e = concurrent.futures.ThreadPoolExecutor(4)
s = range(10)
for i in e.map(time.sleep, s):
    print(i)

Чтобы сохранить заказ, может быть, потому что иногда важно, чтобы вы получили результаты в том же порядке, который вы им даете для сопоставления. И результаты, вероятно, не завернуты в будущие объекты, потому что в некоторых ситуациях может потребоваться слишком много времени, чтобы сделать другую карту по списку, чтобы получить все результаты, если они вам понадобятся. А ведь в большинстве случаев очень вероятно, что следующее значение будет готово до того, как цикл обработает первое значение. Это показано в этом примере:

import concurrent.futures

executor = concurrent.futures.ThreadPoolExecutor() # Or ProcessPoolExecutor
data = some_huge_list()
results = executor.map(crunch_number, data)
finals = []

for value in results:
    finals.append(do_some_stuff(value))

В этом примере может оказаться, что do_some_stuff занимает больше времени crunch_number, и если это действительно так, это действительно не большая потеря производительности, в то время как вы все еще сохраняете удобное использование карты.

Также, поскольку рабочие потоки (/процессы) начинают обработку в начале списка и работают до конца в список, который вы отправили, результаты должны быть завершены в том порядке, в котором они уже получены итератором. Это означает, что в большинстве случаев executor.map просто отлично, но в некоторых случаях, например, если неважно, в каком порядке вы обрабатываете значения, а функция, которую вы передали на map, занимает совсем другое время для запуска, t27 > может быть быстрее.

Ответ 2

Ниже приведен пример отправки против карты. Они оба сразу же принимают задания (отправлено | сопоставлено - начало). Они требуют одинакового времени для завершения, 11 секунд (время последнего результата - старт). Тем не менее, submit выдает результаты, как только завершается работа любого потока в ThreadPoolExecutor maxThreads = 2. Карта дает результаты в порядке их отправки.

import time
import concurrent.futures

def worker(i):
    time.sleep(i)
    return i,time.time()

e = concurrent.futures.ThreadPoolExecutor(2)
arrIn = range(1,7)[::-1]
print arrIn

f = []
print 'start submit',time.time()
for i in arrIn:
    f.append(e.submit(worker,i))
print 'submitted',time.time()
for r in concurrent.futures.as_completed(f):
    print r.result(),time.time()
print

f = []
print 'start map',time.time()
f = e.map(worker,arrIn)
print 'mapped',time.time()
for r in f:
    print r,time.time()    

Выход:

[6, 5, 4, 3, 2, 1]
start submit 1543473934.47
submitted 1543473934.47
(5, 1543473939.473743) 1543473939.47
(6, 1543473940.471591) 1543473940.47
(3, 1543473943.473639) 1543473943.47
(4, 1543473943.474192) 1543473943.47
(1, 1543473944.474617) 1543473944.47
(2, 1543473945.477609) 1543473945.48

start map 1543473945.48
mapped 1543473945.48
(6, 1543473951.483908) 1543473951.48
(5, 1543473950.484109) 1543473951.48
(4, 1543473954.48858) 1543473954.49
(3, 1543473954.488384) 1543473954.49
(2, 1543473956.493789) 1543473956.49
(1, 1543473955.493888) 1543473956.49

Ответ 3

В дополнение к объяснению в ответах здесь, может быть полезно перейти прямо к источнику. Это подтверждает утверждение из другого ответа здесь:

  • .map() выдает результаты в том порядке, в котором они были отправлены, а
  • перебор списка объектов Future с помощью concurrent.futures.as_completed() не гарантирует этот порядок, поскольку это характер as_completed()

.map() определен в базовом классе concurrent.futures._base.Executor:

class Executor(object):
    def submit(self, fn, *args, **kwargs):
        raise NotImplementedError()

    def map(self, fn, *iterables, timeout=None, chunksize=1):
        if timeout is not None:
            end_time = timeout + time.monotonic()

        fs = [self.submit(fn, *args) for args in zip(*iterables)]  # <!!!!!!!!

        def result_iterator():
            try:
                # reverse to keep finishing order
                fs.reverse()  # <!!!!!!!!
                while fs:
                    # Careful not to keep a reference to the popped future
                    if timeout is None:
                        yield fs.pop().result()  # <!!!!!!!!
                    else:
                        yield fs.pop().result(end_time - time.monotonic())
            finally:
                for future in fs:
                    future.cancel()
        return result_iterator()

Как вы упомянули, есть также .submit(), который оставлен для определения в дочерних классах, а именно ProcessPoolExecutor и ThreadPoolExecutor, и возвращает экземпляр _base.Future, для которого вам нужно вызвать .result() чтобы фактически заставить что-либо делать.

Важные строки из .map() сводятся к:

fs = [self.submit(fn, *args) for args in zip(*iterables)]
fs.reverse()
while fs:
    yield fs.pop().result()

.reverse() plus .pop() - это средство для получения первого отправленного результата (из iterables), который должен быть iterables первым, второго отправленного результата, который будет получен вторым, и так далее. Элементы получающегося итератора не являются Future; они сами фактические результаты.