Подтвердить что ты не робот

Использование пула многопроцессорности python в терминале и в модулях кода для Django или Flask

При использовании multiprocessing.Pool в python со следующим кодом существует несколько причудливых действий.

from multiprocessing import Pool
p = Pool(3)
def f(x): return x
threads = [p.apply_async(f, [i]) for i in range(20)]
for t in threads:
    try: print(t.get(timeout=1))
    except Exception: pass

Я получаю следующую ошибку три раза (по одному для каждого потока в пуле), и он печатает "3" через "19":

AttributeError: 'module' object has no attribute 'f'

Первые три вызова apply_async никогда не возвращаются.

Между тем, если я попробую:

from multiprocessing import Pool
p = Pool(3)
def f(x): print(x)
p.map(f, range(20))

Я получаю AttributeError 3 раза, оболочка печатает "6" до "19", а затем зависает и не может быть убита [Ctrl] + [C]

В многопроцессорных документах есть следующее:

Функциональность в этом пакете требует, чтобы модуль mainимпортируемые детьми.

Что это значит?

Чтобы уточнить, я запускаю код в терминале для тестирования функциональности, но в конечном итоге я хочу, чтобы это можно было разместить в модулях веб-сервера. Как вы правильно используете multiprocessing.Pool в терминале python и в модулях кода?

4b9b3361

Ответ 1

Это означает, что пулы должны быть инициализированы после определения функций, которые будут на них выполняться. Использование пулов внутри if __name__ == "__main__": blocks работает, если вы пишете автономный скрипт, но это невозможно ни в больших кодовых базах, ни в коде сервера (например, в проекте Django или Flask). Поэтому, если вы пытаетесь использовать пулы в одном из них, обязательно следуйте этим рекомендациям, которые описаны в следующих разделах:

  1. Инициализируйте пулы внизу модулей или внутри функций.
  2. Не вызывайте методы пула в глобальной области видимости модуля.

В качестве альтернативы, если вам нужен только лучший параллелизм ввода-вывода (например, доступ к базе данных или сетевые вызовы), вы можете избавить себя от всей этой головной боли и использовать пулы потоков вместо пулов процессов. Это включает в себя совершенно недокументированные:

from multiprocessing.pool import ThreadPool

Его интерфейс точно такой же, как и у пула, но, поскольку он использует потоки, а не процессы, он не сопровождается ни одним из предостережений, которые используют пулы процессов, за исключением того, что вы не получаете истинного параллелизма выполнения кода, просто параллелизм в блокировании ввода/вывода.


Пулы должны быть инициализированы после определения функций, которые будут на них выполняться

Непонятный текст из документации по python означает, что во время определения пула окружающий модуль импортируется потоками в пуле. В случае терминала python это означает весь и только код, который вы уже запустили.

Поэтому любые функции, которые вы хотите использовать в пуле, должны быть определены до его инициализации. Это верно как для кода в модуле, так и для кода в терминале. Следующие модификации кода в вопросе будут работать нормально:

from multiprocessing import Pool
def f(x): return x  # FIRST
p = Pool(3) # SECOND
threads = [p.apply_async(f, [i]) for i in range(20)]
for t in threads:
    try: print(t.get(timeout=1))
    except Exception: pass

Или же

from multiprocessing import Pool
def f(x): print(x)  # FIRST
p = Pool(3) # SECOND
p.map(f, range(20))

Хорошо, я имею в виду хорошо на Unix. У Windows есть свои проблемы, о которых я не буду здесь говорить.


Предостережения по использованию пулов в модулях

Но подождите, есть еще кое-что (для использования пулов в модулях, которые вы хотите импортировать в другое место)!

Если вы определяете пул внутри функции, у вас нет проблем. Но если вы используете объект Pool в качестве глобальной переменной в модуле, он должен быть определен внизу страницы, а не вверху. Хотя это идет вразрез с большинством хороших стилей кода, это необходимо для функциональности. Способ использования пула, объявленного в верхней части страницы, состоит в том, чтобы использовать его только с функциями, импортированными из других модулей, например:

from multiprocessing import Pool
from other_module import f
p = Pool(3)
p.map(f, range(20))

Импорт предварительно сконфигурированного пула из другого модуля довольно ужасен, так как импорт должен идти после того, что вы хотите на нем запустить, например:

### module.py ###
from multiprocessing import Pool
POOL = Pool(5)

### module2.py ###
def f(x):
    # Some function
from module import POOL
POOL.map(f, range(10))

И, во-вторых, если вы выполняете что-либо в пуле в глобальной области видимости импортируемого модуля, система зависает. то есть это не работает:

### module.py ###
from multiprocessing import Pool
def f(x): return x
p = Pool(1)
print(p.map(f, range(5)))

### module2.py ###
import module

Это, однако, работает, пока ничего не импортирует module2:

### module.py ###
from multiprocessing import Pool

def f(x): return x
p = Pool(1)
def run_pool(): print(p.map(f, range(5)))

### module2.py ###
import module
module.run_pool()

Теперь причины этого только более причудливы и, вероятно, связаны с тем, что код в вопросе выдает ошибку атрибута только один раз каждый, а после этого кажется, что код выполняется правильно. Также кажется, что потоки пула (по крайней мере, с некоторой надежностью) перезагружают код в модуле после выполнения.

Ответ 2

Функция, которую вы хотите выполнить в пуле потоков, уже должна быть определена при создании пула.

Это должно работать:

from multiprocessing import Pool
def f(x): print(x)
if __name__ == '__main__':
    p = Pool(3)
    p.map(f, range(20))

Причина в том, что (по крайней мере, в системах, имеющих fork), когда вы создаете пул, рабочие создаются путем разметки текущего процесса. Поэтому, если целевая функция еще не определена в этой точке, рабочий не сможет ее вызвать.

В окнах это немного отличается, так как в окнах нет fork. Здесь запускаются новые рабочие процессы и импортируется основной модуль. Поэтому в окнах важно защитить исполняемый код с помощью if __name__ == '__main__'. В противном случае каждый новый работник будет повторно выполнять код и поэтому бесконечно запускает новые процессы, сбивая программу (или систему).

Ответ 3

Существует еще один возможный источник этой ошибки. Я получил эту ошибку при запуске кода примера.

Источником было то, что, несмотря на то, что он правильно установил многократную передачу, компилятор С++ не был установлен в моей системе, о чем-то, о чем мне сообщалось, при попытке обновления многопроцессорности. Поэтому, возможно, стоит проверить, что компилятор установлен.