Python `concurrent.futures`: Итерация по фьючерсам в соответствии с порядком завершения - программирование
Подтвердить что ты не робот

Python `concurrent.futures`: Итерация по фьючерсам в соответствии с порядком завершения

Мне нужно что-то похожее на executor.map, за исключением случаев, когда я перебираю результаты, я хочу перебирать их по порядку завершения, например. рабочий элемент, который был завершен первым, должен сначала отображаться на итерации и т.д. Это значит, что итерация блокирует, если каждый отдельный рабочий элемент в последовательности еще не закончен.

Я знаю, как реализовать это самостоятельно, используя очереди, но мне интересно, можно ли использовать фреймворк futures.

(В основном я использовал исполнителей на основе потоков, поэтому мне нужен ответ, который относится к ним, но общий ответ также приветствуется.)

ОБНОВЛЕНИЕ: Спасибо за ответы! Не могли бы вы объяснить, как я могу использовать as_completed с executor.map? executor.map - самый полезный и сжатый инструмент для меня при использовании фьючерсов, и я бы не захотел использовать Future объекты вручную.

4b9b3361

Ответ 1

executor.map(), как встроенный map(), возвращает результаты только в порядке итерации, поэтому, к сожалению, вы не можете использовать его для определения порядка завершения. concurrent.futures.as_completed() - вот пример:

import time
import concurrent.futures

times = [3, 1, 2]

def sleeper(secs):
    time.sleep(secs)
    print('I slept for {} seconds'.format(secs))
    return secs

# returns in the order given
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
    print(list(executor.map(sleeper, times)))

# I slept for 1 seconds
# I slept for 2 seconds
# I slept for 3 seconds
# [3, 1, 2]

# returns in the order completed
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
    futs = [executor.submit(sleeper, secs) for secs in times]
    print([fut.result() for fut in concurrent.futures.as_completed(futs)])

# I slept for 1 seconds
# I slept for 2 seconds
# I slept for 3 seconds
# [1, 2, 3]

Конечно, если вам нужно использовать интерфейс карты, вы можете создать свою собственную функцию map_as_completed(), которая инкапсулирует вышеизложенное (возможно, добавит ее в подклассу Executor()), но я думаю, что создание экземпляров фьючерсов через executor.submit() это более простой и понятный способ (также позволяет вам предоставлять no-args, kwargs).

Ответ 2

параллельные фьючерсы возвращают итератор на основе времени завершения - это похоже на то, что вы искали.

http://docs.python.org/dev/library/concurrent.futures.html#concurrent.futures.as_completed

Пожалуйста, дайте мне знать, если у вас возникли какие-либо путаницы или трудности с реализацией. С наилучшими пожеланиями, -Давид

Ответ 3

Из документа python

concurrent.futures.as_completed(fs, timeout=None)¶ 

Возвращает итератор   над будущими инстанциями (возможно, создан различными Исполнителями   экземпляры), заданные fs, что дает фьючерсы по мере их завершения (завершено   или были отменены). Любые фьючерсы, которые были выполнены до того, как as_completed()   называется первым. Возвращенный итератор поднимает   TimeoutError, если вызывается next(), и результат не доступен   после таймаута секунд от исходного вызова до as_completed().   timeout может быть int или float. Если тайм-аут не указан или None,   нет времени ожидания.

Вам нужно понять разницу между executor.map() и executor.submit(). Первый отображает функцию в вектор аргументов. Он очень похож на map, но запускает задачи асинхронно. submit(func, arg) запускает одну задачу при каждом вызове. В этой задаче func применяется к arg.

Вот пример использования as_completed() с submit(), который я мог бы запустить на python 3.0

from concurrent import futures
import urllib.request

URLS = ['http://www.foxnews.com/',
        'http://www.cnn.com/',
        'http://europe.wsj.com/',
        'http://www.bbc.co.uk/',
        'http://some-made-up-domain.com/']

def load_url(url, timeout):
    return urllib.request.urlopen(url, timeout=timeout).read()

def main():
    with futures.ThreadPoolExecutor(max_workers=5) as executor:
        future_to_url = dict(
            (executor.submit(load_url, url, 60), url)
             for url in URLS)

        for future in futures.as_completed(future_to_url):
            url = future_to_url[future]
            try:
                print('%r page is %d bytes' % (
                          url, len(future.result())))
            except Exception as e:
                print('%r generated an exception: %s' % (
                          url, e))

if __name__ == '__main__':
    main()
Здесь используется

no map(), задачи выполняются с submit и as_completed()

возвращает итератор по экземплярам Future, заданным fs, который дает фьючерсы по мере их завершения (завершены или отменены).