Подтвердить что ты не робот

Предоставление доступа к общей памяти после того, как дочерние процессы уже запущены

Как предоставить дочерним процессам доступ к данным в общей памяти, если данные доступны только после того, как дочерние процессы были созданы (используя multiprocessing.Process)?

Я знаю multiprocessing.sharedctypes.RawArray, но я не могу понять, как предоставить дочерним процессам доступ к RawArray который создается после того, как процессы уже запущены.

Данные генерируются родительским процессом, и количество данных заранее неизвестно.

Если не для GIL, вместо этого я буду использовать threading, что сделает эту задачу немного проще. Использование реализации, отличной от CPython, не является вариантом.


Глядя под капот muliprocessing.sharedctypes, похоже, что выделены общие объекты ctype с использованием mmap ed memory.

Итак, этот вопрос действительно сводится к следующему: Может ли дочерний процесс получить доступ к анонимно отображаемой памяти, если родительский элемент после вызова дочернего процесса был вызван mmap()?

Это несколько в духе того, что задается в этом вопросе, за исключением того, что в моем случае вызывающим абонентом mmap() является родительский процесс, а не дочерний процесс.


(Раскрыты)

Я создал свою собственную версию RawArray, которая использует shm_open() под капотом. Получаемый общий массив ctypes может использоваться совместно с любым процессом до тех пор, пока совпадает идентификатор (tag).

Подробнее см. этот ответ.

4b9b3361

Ответ 1

Ваша проблема звучит как идеальная подгонка для модулей posix_ipc или sysv_ipc, которые предоставляют либо API POSIX, либо SysV для общих памяти, семафоров и очередей сообщений. В матрице функций есть отличные рекомендации для выбора среди модулей, которые он предоставляет.

Проблема с анонимными областями mmap(2) заключается в том, что вы не можете легко поделиться ими с другими процессами - если бы они были с файловой поддержкой, это было бы легко, но если вам действительно не нужен файл ни для чего другого, он чувствует себя глупо. Вы могли бы использовать флаг CLONE_VM для системного вызова clone(2), если бы это было на C, но я бы не хотел его использовать с интерпретатором языка, который, вероятно, делает предположения о безопасности памяти. (Это было бы немного опасно даже в C, поскольку программисты-программисты через пять лет также могут быть шокированы поведением CLONE_VM.)

Но SysV и новые сопоставления общей памяти POSIX позволяют даже несвязанным процессам присоединяться и отделяться от общей памяти по идентификатору, поэтому все, что вам нужно сделать, это обмениваться идентификатором с процессами, которые создают сопоставления с процессами, которые потребляют сопоставления, а затем, когда вы манипулируете данными в сопоставлениях, они доступны для всех процессов одновременно без каких-либо дополнительных служебных операций синтаксического анализа. Функция shm_open(3) возвращает int, которая используется в качестве дескриптора файла при последующих вызовах ftruncate(2), а затем mmap(2), поэтому другие процессы могут использовать сегмент разделяемой памяти без файла, создаваемого в файловой системе, - и эта память будет сохраняться, даже если все процессы, использующие ее, вышли. (Возможно, немного странно для Unix, но оно гибкое.)

Ответ 2

Отказ от ответственности: я являюсь автором вопроса.

В конце концов я использовал модуль posix_ipc, чтобы создать свою собственную версию RawArray. Я использовал главным образом posix_ipc.SharedMemory, который вызывает shm_open() под капотом.

Моя реализация (ShmemRawArray) предоставляет ту же функциональность, что и RawArray, но требует двух дополнительных параметров - a tag для однозначной идентификации области разделяемой памяти и флага create для определения того, нужно ли нам создать новый сегмент разделяемой памяти или привязку к существующему.

Вот копия, если кому интересно: https://gist.github.com/1222327

ShmemRawArray(typecode_or_type, size_or_initializer, tag, create=True)

Примечания по использованию:

  • Первые два аргумента (typecode_or_type и size_or_initializer) должны работать так же, как с RawArray.
  • Общий массив доступен любому процессу, если соответствует tag.
  • Сегмент разделяемой памяти отсоединяется, когда объект происхождения (возвращаемый ShmemRawArray(..., create=True)) удален
  • Создание общего массива с использованием tag, который в настоящее время существует, приведет к созданию ExistentialError
  • Доступ к разделяемому массиву с помощью tag, который не существует (или тот, который был отсоединен) также приведет к созданию ExistentialError

A SSCCE (Short, Self Contained, Compilable Example), показывающий его в действии.

#!/usr/bin/env python2.7
import ctypes
import multiprocessing
from random import random, randint
from shmemctypes import ShmemRawArray

class Point(ctypes.Structure):
    _fields_ = [ ("x", ctypes.c_double), ("y", ctypes.c_double) ]

def worker(q):
    # get access to ctypes array shared by parent
    count, tag = q.get()
    shared_data = ShmemRawArray(Point, count, tag, False)

    proc_name = multiprocessing.current_process().name
    print proc_name, ["%.3f %.3f" % (d.x, d.y) for d in shared_data]

if __name__ == '__main__':
    procs = []
    np = multiprocessing.cpu_count()
    queue = multiprocessing.Queue()

    # spawn child processes
    for i in xrange(np):
        p = multiprocessing.Process(target=worker, args=(queue,))
        procs.append(p)
        p.start()

    # create a unique tag for shmem segment
    tag = "stack-overflow-%d" % multiprocessing.current_process().pid

    # random number of points with random data
    count = randint(3,10) 
    combined_data = [Point(x=random(), y=random()) for i in xrange(count)]

    # create ctypes array in shared memory using ShmemRawArray
    # - we won't be able to use multiprocssing.sharectypes.RawArray here 
    #   because children already spawned
    shared_data = ShmemRawArray(Point, combined_data, tag)

    # give children info needed to access ctypes array
    for p in procs:
        queue.put((count, tag))

    print "Parent", ["%.3f %.3f" % (d.x, d.y) for d in shared_data]
    for p in procs:
        p.join()

Выполнение этого результата приводит к следующему выводу:

[[email protected]]$ ./shmem_test.py
Parent ['0.633 0.296', '0.559 0.008', '0.814 0.752', '0.842 0.110']
Process-1 ['0.633 0.296', '0.559 0.008', '0.814 0.752', '0.842 0.110']
Process-2 ['0.633 0.296', '0.559 0.008', '0.814 0.752', '0.842 0.110']
Process-3 ['0.633 0.296', '0.559 0.008', '0.814 0.752', '0.842 0.110']
Process-4 ['0.633 0.296', '0.559 0.008', '0.814 0.752', '0.842 0.110']

Ответ 3

Я думаю, что вы ищете mmap module

относительно сериализации данных этот вопрос ответ, конечно, если вы надеетесь избежать копирования, у меня нет решения

ИЗМЕНИТЬ

на самом деле вы можете использовать модуль nonstdlib _mutliprocessing в CPython 3.2, чтобы иметь адрес объекта mmap и использовать его с from_address объекта ctypes это то, что на самом деле то, что делает RawArray на самом деле, конечно, вы не должны пытаться изменять размер объекта mmap, так как адрес mmap может измениться в этом случае

import mmap
import _multiprocessing
from ctypes import Structure,c_int

map = mmap.mmap(-1,4)
class A(Structure):
    _fields_ = [("x", c_int)]
x = _multiprocessing.address_of_buffer(map)
b=A.from_address(x[0])
b.x = 256

>>> map[0:4]
'\x00\x01\x00\x00'

чтобы вывести память после создания ребенка, вам нужно сопоставить свою память с реальным файлом, который вызывает

map = mmap.mmap(open("hello.txt", "r+b").fileno(),4)