Подтвердить что ты не робот

Совместное использование сложного объекта между процессами Python?

У меня довольно сложный объект Python, который мне нужно разделить между несколькими процессами. Я запускаю эти процессы с помощью multiprocessing.Process. Когда я разделяю объект с multiprocessing.Queue и multiprocessing.Pipe в нем, они разделяются просто отлично. Но когда я пытаюсь разделить объект с другими объектами не-мультипроцессорного модуля, кажется, что Python создает эти объекты. Это правда?

Я попытался использовать многопроцессорную систему. Но я не уверен, какой тип должен быть? Мой класс объектов называется MyClass. Но когда я пытаюсь multiprocess.Value(MyClass, instance), он терпит неудачу с:

TypeError: this type has no size

Любая идея, что происходит?

4b9b3361

Ответ 1

Вы можете сделать это, используя классы Python Multiprocessing "Manager" и класс прокси, который вы определяете. Из документов Python: http://docs.python.org/library/multiprocessing.html#proxy-objects

То, что вы хотите сделать, это определить класс прокси для вашего настраиваемого объекта, а затем обмениваться им с помощью "Remote Manager" - посмотрите примеры на той же странице связанного документа для "удаленного менеджера", где отображаются документы как делиться удаленной очередью. Вы будете делать то же самое, но ваш вызов your_manager_instance.register() будет включать ваш собственный прокси-класс в список аргументов.

Таким образом, вы настраиваете сервер для совместного использования настраиваемого объекта с помощью настраиваемого прокси. Ваши клиенты нуждаются в доступе к серверу (опять же, посмотрите превосходные примеры документации по настройке доступа клиент/сервер к удаленной очереди, но вместо того, чтобы делиться очередью, вы предоставляете доступ к вашему конкретному классу).

Ответ 2

После много исследований и тестирования я нашел "Менеджер" для выполнения этой задачи на уровне некоммерческого.

В приведенном ниже коде показано, что объект inst разделяется между процессами, что означает, что свойство var of inst изменяется вне, когда дочерний процесс меняет его.

from multiprocessing import Process, Manager
from multiprocessing.managers import BaseManager

class SimpleClass(object):
    def __init__(self):
        self.var = 0

    def set(self, value):
        self.var = value

    def get(self):
        return self.var


def change_obj_value(obj):
    obj.set(100)


if __name__ == '__main__':
    BaseManager.register('SimpleClass', SimpleClass)
    manager = BaseManager()
    manager.start()
    inst = manager.SimpleClass()

    p = Process(target=change_obj_value, args=[inst])
    p.start()
    p.join()

    print inst                    # <__main__.SimpleClass object at 0x10cf82350>
    print inst.get()              # 100

Хорошо, код выше достаточно, если вам нужно только поделиться простыми объектами.

Почему нет сложного? Поскольку он может терпеть неудачу, если ваш объект вложен (объект внутри объекта):

from multiprocessing import Process, Manager
from multiprocessing.managers import BaseManager

class GetSetter(object):
    def __init__(self):
        self.var = None

    def set(self, value):
        self.var = value

    def get(self):
        return self.var


class ChildClass(GetSetter):
    pass

class ParentClass(GetSetter):
    def __init__(self):
        self.child = ChildClass()
        GetSetter.__init__(self)

    def getChild(self):
        return self.child


def change_obj_value(obj):
    obj.set(100)
    obj.getChild().set(100)


if __name__ == '__main__':
    BaseManager.register('ParentClass', ParentClass)
    manager = BaseManager()
    manager.start()
    inst2 = manager.ParentClass()

    p2 = Process(target=change_obj_value, args=[inst2])
    p2.start()
    p2.join()

    print inst2                    # <__main__.ParentClass object at 0x10cf82350>
    print inst2.getChild()         # <__main__.ChildClass object at 0x10cf6dc50>
    print inst2.get()              # 100
    #good!

    print inst2.getChild().get()   # None
    #bad! you need to register child class too but there almost no way to do it
    #even if you did register child class, you may get PicklingError :)

Я думаю, что основной причиной такого поведения является то, что Manager - это всего лишь сборка моноблоков поверх низкоуровневых коммуникационных инструментов, таких как pipe/queue.

Таким образом, этот подход не рекомендуется для многопроцессорного случая. Всегда лучше, если вы можете использовать инструменты низкого уровня, такие как lock/semaphore/pipe/queue или высокоуровневые инструменты, такие как Redis queue или Redis publish/subscribe для сложного варианта использования (только моя рекомендация lol).

Ответ 3

вот пакет python, который я сделал для этого (совместное использование сложных объектов между процессами).

git: https://github.com/dRoje/pipe-proxy

Идея заключается в том, что вы создаете прокси для своего объекта и передаете его процессу. Затем вы используете прокси-сервер, как будто у вас есть ссылка на исходный объект. Хотя вы можете использовать только вызовы методов, поэтому доступ к объектным переменным выполняется, отбрасывая сеттеры и геттеры.

Скажем, у нас есть объект, называемый "пример", создание прокси-сервера и прокси-слушателя легко:

from pipeproxy import proxy 
example = Example() 
exampleProxy, exampleProxyListener = proxy.createProxy(example) 

Теперь вы отправляете прокси-сервер другому процессу.

p = Process(target=someMethod, args=(exampleProxy,)) p.start()

Используйте его в другом процессе, так как вы использовали бы оригинальный объект (пример):

def someMethod(exampleProxy):
    ...
    exampleProxy.originalExampleMethod()
    ...

Но вы должны слушать его в основном процессе:

exampleProxyListener.listen()

Подробнее и найти примеры здесь:

http://matkodjipalo.com/index.php/2017/11/12/proxy-solution-python-multiprocessing/

Ответ 4

Чтобы сохранить некоторые головные боли с общими ресурсами, вы можете попытаться собрать данные, которые нуждаются в доступе к однопользовательскому ресурсу в операторе возврата функции, которая отображается, например. pool.imap_unordered, а затем обрабатывать его в цикле, который извлекает частичные результаты:

for result in in pool.imap_unordered(process_function, iterable_data):
    do_something(result)

Если это не так много данных, которые возвращаются, тогда это может быть не так много накладных расходов.