Подтвердить что ты не робот

Как использовать инициализатор для настройки моего многопроцессорного пула?

Я пытаюсь использовать объект многопроцессного пула. Я хотел бы, чтобы каждый процесс открыл соединение с базой данных при его запуске, а затем использовал это соединение для обработки передаваемых данных. (Вместо того, чтобы открывать и закрывать соединение для каждого бита данных.) Кажется, что инициализатор для, но я не могу оборачивать голову тем, как взаимодействует рабочий и инициализатор. Поэтому у меня есть что-то вроде этого:

def get_cursor():
  return psycopg2.connect(...).cursor()

def process_data(data):
   # here I'd like to have the cursor so that I can do things with the data

if __name__ == "__main__":
  pool = Pool(initializer=get_cursor, initargs=())
  pool.map(process_data, get_some_data_iterator())

как мне (или сделать I) вернуть курсор из get_cursor() в process_data()?

4b9b3361

Ответ 1

Функция инициализации вызывается так:

def worker(...):
    ...
    if initializer is not None:
        initializer(*args)

поэтому никакого возвращаемого значения не сохраняется. Вы можете подумать, что это обрекает вас, но нет! Каждый работник находится в отдельном процессе. Таким образом, вы можете использовать обычную переменную global.

Это не совсем красиво, но работает:

cursor = None
def set_global_cursor(...):
    global cursor
    cursor = ...

Теперь вы можете просто использовать cursor в своей функции process_data. Переменная cursor внутри каждого отдельного процесса отделена от всех других процессов, поэтому они не наступают друг на друга.

(я понятия не имею, имеет ли psycopg2 другой способ борьбы с этим, который не включает в себя прежде всего использование multiprocessing, это подразумевается как общий ответ на общую проблему с модулем multiprocessing.)

Ответ 2

torek уже дал хорошее объяснение, почему инициализатор не работает в этом случае. Тем не менее, я не поклонник Глобальной переменной лично, поэтому я хотел бы вставить другое решение здесь.

Идея состоит в том, чтобы использовать класс для переноса функции и инициализировать класс с помощью "глобальной" переменной.

class Processor(object):
  """Process the data and save it to database."""

  def __init__(self, credentials):
    """Initialize the class with 'global' variables"""
    self.cursor = psycopg2.connect(credentials).cursor()

  def __call__(self, data):
    """Do something with the cursor and data"""
    self.cursor.find(data.key)

И затем вызовите с помощью

p = Pool(5)
p.map(Processor(credentials), list_of_data)

Итак, первый параметр инициализировал класс с учетными данными, возвращает экземпляр класса и карты, вызывающий экземпляр с данными.

Хотя это не так просто, как решение глобальной переменной, я настоятельно рекомендую избегать глобальной переменной и инкапсулировать переменные каким-то безопасным способом. (И я действительно хочу, чтобы они могли поддерживать выражение лямбда в один прекрасный день, это значительно упростит...)

Ответ 3

Вы также можете отправить функцию вместе с инициализатором и создать в нем соединение. После этого вы добавляете курсор к функции.

def init_worker(function):
    function.cursor = db.conn()

Теперь вы можете получить доступ к db через function.cursor, не используя глобальные переменные, например:

def use_db(i):
    print(use_db.cursor) #process local
pool = Pool(initializer=init_worker, initargs=(use_db,))
pool.map(use_db, range(10))

Ответ 4

Учитывая, что определение глобальных переменных в инициализаторе обычно нежелательно, мы можем избежать их использования, а также избежать повторной дорогостоящей инициализации в каждом вызове с простым кэшированием в каждом подпроцессе:

from functools import lru_cache
from multiprocessing.pool import Pool
from time import sleep


@lru_cache(maxsize=None)
def _initializer(a, b):
    print(f'Initialized with {a}, {b}')


def _pool_func(a, b, i):
    _initializer(a, b)
    sleep(1)
    print(f'got {i}')


arg_a = 1
arg_b = 2

with Pool(processes=10) as pool:
    pool.starmap(_pool_func, ((arg_a, arg_b, i) for i in range(0, 100)))