Подтвердить что ты не робот

Объединение Pool.map с разделяемой памятью Array в многопроцессорной среде Python

У меня есть очень большой (только для чтения) массив данных, который я хочу обрабатывать несколькими процессами параллельно.

Мне нравится функция Pool.map и хотел бы использовать ее для вычисления функций по этим данным параллельно.

Я видел, что для использования данных разделяемой памяти между процессами можно использовать класс Value или Array. Но когда я пытаюсь использовать это, я получаю RuntimeError: "Объекты SynchronizedString должны использоваться только совместно между процессами через наследование при использовании функции Pool.map:

Вот упрощенный пример того, что я пытаюсь сделать:

from sys import stdin
from multiprocessing import Pool, Array

def count_it( arr, key ):
  count = 0
  for c in arr:
    if c == key:
      count += 1
  return count

if __name__ == '__main__':
  testData = "abcabcs bsdfsdf gdfg dffdgdfg sdfsdfsd sdfdsfsdf"
  # want to share it using shared memory
  toShare = Array('c', testData)

  # this works
  print count_it( toShare, "a" )

  pool = Pool()

  # RuntimeError here
  print pool.map( count_it, [(toShare,key) for key in ["a", "b", "s", "d"]] )

Может ли кто-нибудь сказать мне, что я делаю неправильно здесь?

Так что я хотел бы сделать, это передать информацию о вновь созданном распределенном распределяемом массиве памяти для процессов после того, как они были созданы в пуле процессов.

4b9b3361

Ответ 1

Попытка снова, как только я увидел щедрость;)

В принципе, я думаю, что сообщение об ошибке означает, что он сказал - многопроцессорная разделяемая память. Массивы не могут передаваться в качестве аргументов (путем травления). Не имеет смысла сериализовать данные - точка - это данные общей памяти. Таким образом, вы должны сделать общий массив глобальным. Я считаю, что опередить его как атрибут модуля, как и в моем первом ответе, но просто оставить его как глобальную переменную в вашем примере также хорошо работает. Принимая во внимание вашу мысль о том, что вы не хотите устанавливать данные перед вилкой, здесь приведен пример. Если вы хотите иметь более одного возможного общего массива (и почему вы хотели передать toShare в качестве аргумента), вы также могли бы составить глобальный список общих массивов и просто передать индекс count_it (который станет for c in toShare[i]:),

from sys import stdin
from multiprocessing import Pool, Array, Process

def count_it( key ):
  count = 0
  for c in toShare:
    if c == key:
      count += 1
  return count

if __name__ == '__main__':
  # allocate shared array - want lock=False in this case since we 
  # aren't writing to it and want to allow multiple processes to access
  # at the same time - I think with lock=True there would be little or 
  # no speedup
  maxLength = 50
  toShare = Array('c', maxLength, lock=False)

  # fork
  pool = Pool()

  # can set data after fork
  testData = "abcabcs bsdfsdf gdfg dffdgdfg sdfsdfsd sdfdsfsdf"
  if len(testData) > maxLength:
      raise ValueError, "Shared array too small to hold data"
  toShare[:len(testData)] = testData

  print pool.map( count_it, ["a", "b", "s", "d"] )

[EDIT: вышеописанное не работает в Windows из-за того, что не используется fork. Однако нижеследующее работает в Windows, все еще используя пул, поэтому я думаю, что это самое близкое к тому, что вы хотите:

from sys import stdin
from multiprocessing import Pool, Array, Process
import mymodule

def count_it( key ):
  count = 0
  for c in mymodule.toShare:
    if c == key:
      count += 1
  return count

def initProcess(share):
  mymodule.toShare = share

if __name__ == '__main__':
  # allocate shared array - want lock=False in this case since we 
  # aren't writing to it and want to allow multiple processes to access
  # at the same time - I think with lock=True there would be little or 
  # no speedup
  maxLength = 50
  toShare = Array('c', maxLength, lock=False)

  # fork
  pool = Pool(initializer=initProcess,initargs=(toShare,))

  # can set data after fork
  testData = "abcabcs bsdfsdf gdfg dffdgdfg sdfsdfsd sdfdsfsdf"
  if len(testData) > maxLength:
      raise ValueError, "Shared array too small to hold data"
  toShare[:len(testData)] = testData

  print pool.map( count_it, ["a", "b", "s", "d"] )

Не знаю, почему карта не будет сортировать массив, но процесс и пул - я думаю, возможно, это было перенесено в точке инициализации подпроцесса в окнах. Обратите внимание, что данные все еще установлены после fork.

Ответ 2

Проблема, которую я вижу, заключается в том, что пул не поддерживает травление общих данных через свой список аргументов. То, что сообщение об ошибке означает "объекты должны быть разделены между процессами через наследование". Общие данные должны быть унаследованы, т.е. Глобальны, если вы хотите поделиться им с помощью класса Pool.

Если вам нужно передать их явно, вам, возможно, придется использовать multiprocessing.Process. Вот ваш пример с переработкой:

from multiprocessing import Process, Array, Queue

def count_it( q, arr, key ):
  count = 0
  for c in arr:
    if c == key:
      count += 1
  q.put((key, count))

if __name__ == '__main__':
  testData = "abcabcs bsdfsdf gdfg dffdgdfg sdfsdfsd sdfdsfsdf"
  # want to share it using shared memory
  toShare = Array('c', testData)

  q = Queue()
  keys = ['a', 'b', 's', 'd']
  workers = [Process(target=count_it, args = (q, toShare, key))
    for key in keys]

  for p in workers:
    p.start()
  for p in workers:
    p.join()
  while not q.empty():
    print q.get(),

Выход: ('s', 9) ('a', 2) ('b', 3) ('d', 12)

Порядок упорядочения элементов очереди может меняться.

Чтобы сделать это более общим и похожим на Pool, вы можете создать фиксированное число N процессов, разделить список ключей на N частей, а затем использовать функцию-оболочку в качестве цели процесса, которая вызовет count_it для каждой клавиши в списке он передается, например:

def wrapper( q, arr, keys ):
  for k in keys:
    count_it(q, arr, k)

Ответ 3

Если данные считываются, просто сделайте его переменной в модуле до fork из пула. Тогда все дочерние процессы должны иметь к нему доступ, и он не будет скопирован, если вы не напишете ему.

import myglobals # anything (empty .py file)
myglobals.data = []

def count_it( key ):
    count = 0
    for c in myglobals.data:
        if c == key:
            count += 1
    return count

if __name__ == '__main__':
myglobals.data = "abcabcs bsdfsdf gdfg dffdgdfg sdfsdfsd sdfdsfsdf"

pool = Pool()
print pool.map( count_it, ["a", "b", "s", "d"] )

Если вы хотите использовать Array, хотя можете попробовать с аргументом ключевого слова lock=False (по умолчанию это правда).

Ответ 4

Если вы видите ошибку RuntimeError: Synchronized objects should only be shared between processes through inheritance, рассмотрите возможность использования multiprocessing.Manager, поскольку она не имеет этого ограничения. Менеджер работает, учитывая, что он предположительно работает в отдельном процессе.

import ctypes
import multiprocessing

manager = multiprocessing.Manager()
counter = manager.Value(ctypes.c_ulonglong, 0)
counter_lock = manager.Lock()  # pylint: disable=no-member

with counter_lock:
    counter.value = count = counter.value + 1

Ответ 5

Модуль multiprocessing.sharedctypes предоставляет функции для выделения объектов ctypes из разделяемой памяти, которые могут быть унаследованы дочерними процессами.

Итак, ваше использование sharedctypes неверно. Вы хотите наследовать этот массив из родительского процесса или хотите передать его явно? В первом случае вам нужно создать глобальную переменную, как предлагают другие ответы. Но вам не нужно использовать sharedctypes, чтобы передать его явно, просто передайте оригинал testData.

Кстати, использование Pool.map() неверно. Он имеет тот же интерфейс, что и встроенная функция map() (вы перепутали ее с starmap()?). Ниже приведен пример работы с явным передачей массива:

from multiprocessing import Pool

def count_it( (arr, key) ):
    count = 0
    for c in arr:
        if c == key:
            count += 1
    return count

if __name__ == '__main__':
    testData = "abcabcs bsdfsdf gdfg dffdgdfg sdfsdfsd sdfdsfsdf"
    pool = Pool()
    print pool.map(count_it, [(testData, key) for key in ["a", "b", "s", "d"]])