Подтвердить что ты не робот

Как многопоточная операция внутри цикла в Python

Скажем, у меня очень большой список, и я выполняю такую ​​операцию:

for item in items:
    try:
        api.my_operation(item)
    except:
        print 'error with item'

Моя проблема в два раза:

  • Есть много элементов
  • api.my_operation принимает навсегда возврат

Я хотел бы использовать многопоточность, чтобы развернуть кучу api.my_operations сразу, чтобы я мог обрабатывать, возможно, 5 или 10 или даже 100 элементов одновременно.

Если my_operation() возвращает исключение (потому что, возможно, я уже обработал этот элемент) - это ОК. Это ничего не сломает. Цикл может продолжаться до следующего элемента.

Примечание: это для Python 2.7.3

4b9b3361

Ответ 1

Во-первых, в Python, если ваш код связан с ЦП, многопоточность не поможет, потому что только один поток может удерживать блокировку Global Interpreter и, следовательно, запускать код Python за раз. Итак, вам нужно использовать процессы, а не потоки.

Это неверно, если ваша операция "навсегда вернется", потому что она привязана к IO, то есть ожидает в сети или на диске копии или тому подобное. Я вернусь к этому позже.


Далее, способ обрабатывать 5 или 10 или 100 элементов одновременно - создать пул из 5 или 10 или 100 рабочих и поместить элементы в очередь, обслуживаемую службой. К счастью, библиотеки stdlib multiprocessing и concurrent.futures оборачивают большую часть деталей для вас.

Первый является более мощным и гибким для традиционного программирования; последнее проще, если вам нужно составить будущее; для тривиальных случаев, это действительно не имеет значения, что вы выбираете. (В этом случае наиболее очевидная реализация с каждой из них занимает 3 строки с futures, 4 строки с multiprocessing.)

Если вы используете 2.6-2.7 или 3.0-3.1, futures не встроен, но вы можете установить его из PyPI (pip install futures).


Наконец, проще всего распараллелить вещи, если вы можете превратить всю итерацию цикла в вызов функции (что-то, что вы могли бы, например, перейти к map), так что сделайте это первым:

def try_my_operation(item):
    try:
        api.my_operation(item)
    except:
        print('error with item')

Объединяя все это:

executor = concurrent.futures.ProcessPoolExecutor(10)
futures = [executor.submit(try_my_operation, item) for item in items]
concurrent.futures.wait(futures)

Если у вас много относительно небольших заданий, накладные расходы на многопроцессорную обработку могут увеличивать прибыль. Способ решения этой проблемы заключается в том, чтобы запустить работу в более крупные рабочие места. Например (используя grouper из itertools рецептов, который вы можете скопировать и вставить в свой код или получить из проекта more-itertools на PyPI):

def try_multiple_operations(items):
    for item in items:
        try:
            api.my_operation(item)
        except:
            print('error with item')

executor = concurrent.futures.ProcessPoolExecutor(10)
futures = [executor.submit(try_multiple_operations, group) 
           for group in grouper(5, items)]
concurrent.futures.wait(futures)

Наконец, что, если ваш код привязан к IO? Тогда потоки так же хороши, как и процессы, и с меньшими затратами (и меньше ограничений, но эти ограничения обычно не будут влиять на вас в таких случаях). Иногда, что "меньше накладных расходов" достаточно, чтобы вы не нуждались в доработке с потоками, но вы делаете с процессами, что является хорошей победой.

Итак, как вы используете потоки вместо процессов? Просто измените ProcessPoolExecutor на ThreadPoolExecutor.

Если вы не знаете, является ли ваш код привязанным к процессору или привязанным к IO, просто попробуйте в обоих направлениях.


Могу ли я сделать это для нескольких функций в моем python script? Например, если бы у меня был другой цикл в другом месте кода, который я хотел распараллелить. Возможно ли выполнить две многопотоковые функции в одном и том же script?

Да. Фактически, есть два разных способа сделать это.

Во-первых, вы можете использовать один и тот же (потоковый или процесс) исполнитель и использовать его из нескольких мест без проблем. Весь смысл задач и фьючерсов заключается в том, что они самодостаточны; вам все равно, куда они бегут, просто чтобы вы их поставили в очередь и в итоге получили ответ.

В качестве альтернативы вы можете иметь двух исполнителей в одной и той же программе без проблем. Это связано с производительностью - если вы одновременно используете обоих исполнителей, вы попытаетесь запустить (например) 16 занятых потоков на 8 ядер, что означает, что произойдет некоторое переключение контекста. Но иногда это стоит того, потому что, скажем, оба исполнителя редко бывают заняты одновременно, и это делает ваш код намного проще. Или, может быть, один исполнитель запускает очень большие задачи, которые могут занять некоторое время, а в другом - очень маленькие задачи, которые нужно выполнить как можно быстрее, потому что отзывчивость важнее, чем пропускная способность для части вашей программы.

Если вы не знаете, что подходит для вашей программы, обычно это первый.

Ответ 2

Изменить 2018-02-06: ревизия на основе этого комментария

Изменить: забыл упомянуть, что это работает на Python 2.7.x

Там multiprocesing.pool, и следующий пример иллюстрирует, как использовать один из них:

from multiprocessing.pool import ThreadPool as Pool
# from multiprocessing import Pool

pool_size = 5  # your "parallelness"

# define worker function before a Pool is instantiated
def worker(item):
    try:
        api.my_operation(item)
    except:
        print('error with item')

pool = Pool(pool_size)

for item in items:
    pool.apply_async(worker, (item,))

pool.close()
pool.join()

Теперь, если вы действительно определили, что ваш процесс связан с процессором, как упомянуто @abarnert, измените ThreadPool на реализацию пула процессов (закомментировано в разделе Импорт ThreadPool). Вы можете найти более подробную информацию здесь: http://docs.python.org/2/library/multiprocessing.html#using-a-pool-of-workers

Ответ 3

Вы можете разделить обработку на указанное число потоков, используя такой подход:

import threading                                                                

def process(items, start, end):                                                 
    for item in items[start:end]:                                               
        try:                                                                    
            api.my_operation(item)                                              
        except Exception:                                                       
            print('error with item')                                            


def split_processing(items, num_splits=4):                                      
    split_size = len(items) // num_splits                                       
    threads = []                                                                
    for i in range(num_splits):                                                 
        # determine the indices of the list this thread will handle             
        start = i * split_size                                                  
        # special case on the last chunk to account for uneven splits           
        end = None if i+1 == num_splits else (i+1) * split_size                 
        # create the thread                                                     
        threads.append(                                                         
            threading.Thread(target=process, args=(items, start, end)))         
        threads[-1].start() # start the thread we just created                  

    # wait for all threads to finish                                            
    for t in threads:                                                           
        t.join()                                                                



split_processing(items)

Ответ 4

import numpy as np
import threading


def threaded_process(items_chunk):
    """ Your main process which runs in thread for each chunk"""
    for item in items_chunk:                                               
        try:                                                                    
            api.my_operation(item)                                              
        except Exception:                                                       
            print('error with item')  

n_threads = 20
# Splitting the items into chunks equal to number of threads
array_chunk = np.array_split(input_image_list, n_threads)
thread_list = []
for thr in range(n_threads):
    thread = threading.Thread(target=threaded_process, args=(array_chunk[thr]),)
    thread_list.append(thread)
    thread_list[thr].start()

for thread in thread_list:
    thread.join()