Подтвердить что ты не робот

Как использовать многопроцессорность с экземплярами класса в Python?

Я пытаюсь создать класс, который может запускать отдельный процесс, чтобы выполнить некоторую работу, которая занимает много времени, запустить кучу из основного модуля, а затем дождаться их завершения. Я хочу запускать процессы один раз, а затем продолжать кормить их, а не создавать и уничтожать процессы. Например, возможно, у меня есть 10 серверов, на которых запущена команда dd, тогда я хочу, чтобы все они scp файл и т.д.

Моя конечная цель - создать класс для каждой системы, который отслеживает информацию для системы, в которой она привязана как к IP-адресу, журналам, времени выполнения и т.д. Но этот класс должен иметь возможность запускать системную команду а затем вернуть выполнение обратно вызывающему абоненту во время выполнения этой системной команды, чтобы выполнить последующий результат с помощью системной команды позже.

Моя попытка не работает, потому что я не могу отправить метод экземпляра класса по каналу в подпроцесс через pickle. Они не разборчивы. Поэтому я попытался исправить это различными способами, но я не могу понять это. Как мой код может быть исправлен, чтобы сделать это? Какая польза от многопроцессорности, если вы не можете отправить что-нибудь полезное?

Есть ли хорошая документация по многопроцессорной обработке, используемой с экземплярами класса? Единственный способ заставить мультипроцессорный модуль работать на простых функциях. Каждая попытка использовать его в экземпляре класса не удалась. Может быть, я должен передать события? Я еще не понимаю, как это сделать.

import multiprocessing
import sys
import re

class ProcessWorker(multiprocessing.Process):
    """
    This class runs as a separate process to execute worker commands in parallel
    Once launched, it remains running, monitoring the task queue, until "None" is sent
    """

    def __init__(self, task_q, result_q):
        multiprocessing.Process.__init__(self)
        self.task_q = task_q
        self.result_q = result_q
        return

    def run(self):
        """
        Overloaded function provided by multiprocessing.Process.  Called upon start() signal
        """
        proc_name = self.name
        print '%s: Launched' % (proc_name)
        while True:
            next_task_list = self.task_q.get()
            if next_task is None:
                # Poison pill means shutdown
                print '%s: Exiting' % (proc_name)
                self.task_q.task_done()
                break
            next_task = next_task_list[0]
            print '%s: %s' % (proc_name, next_task)
            args = next_task_list[1]
            kwargs = next_task_list[2]
            answer = next_task(*args, **kwargs)
            self.task_q.task_done()
            self.result_q.put(answer)
        return
# End of ProcessWorker class

class Worker(object):
    """
    Launches a child process to run commands from derived classes in separate processes,
    which sit and listen for something to do
    This base class is called by each derived worker
    """
    def __init__(self, config, index=None):
        self.config = config
        self.index = index

        # Launce the ProcessWorker for anything that has an index value
        if self.index is not None:
            self.task_q = multiprocessing.JoinableQueue()
            self.result_q = multiprocessing.Queue()

            self.process_worker = ProcessWorker(self.task_q, self.result_q)
            self.process_worker.start()
            print "Got here"
            # Process should be running and listening for functions to execute
        return

    def enqueue_process(target):  # No self, since it is a decorator
        """
        Used to place an command target from this class object into the task_q
        NOTE: Any function decorated with this must use fetch_results() to get the
        target task result value
        """
        def wrapper(self, *args, **kwargs):
            self.task_q.put([target, args, kwargs]) # FAIL: target is a class instance method and can't be pickled!
        return wrapper

    def fetch_results(self):
        """
        After all processes have been spawned by multiple modules, this command
        is called on each one to retreive the results of the call.
        This blocks until the execution of the item in the queue is complete
        """
        self.task_q.join()                          # Wait for it to to finish
        return self.result_q.get()                  # Return the result

    @enqueue_process
    def run_long_command(self, command):
        print "I am running number % as process "%number, self.name

        # In here, I will launch a subprocess to run a  long-running system command
        # p = Popen(command), etc
        # p.wait(), etc
        return 

    def close(self):
        self.task_q.put(None)
        self.task_q.join()

if __name__ == '__main__':
    config = ["some value", "something else"]
    index = 7
    workers = []
    for i in range(5):
        worker = Worker(config, index)
        worker.run_long_command("ls /")
        workers.append(worker)
    for worker in workers:
        worker.fetch_results()

    # Do more work... (this would actually be done in a distributor in another class)

    for worker in workers:
        worker.close() 

Изменить: я попытался переместить класс ProcessWorker и создать очереди многопроцессорности вне класса Worker, а затем попытался вручную рассортировать рабочий экземпляр. Даже это не работает, и я получаю сообщение об ошибке

RuntimeError: объекты очереди должны делиться только между процессами через наследование

. Но я только передаю ссылки этих очередей в рабочий экземпляр? Мне не хватает чего-то фундаментального. Вот модифицированный код из основного раздела:

if __name__ == '__main__':
    config = ["some value", "something else"]
    index = 7
    workers = []
    for i in range(1):
        task_q = multiprocessing.JoinableQueue()
        result_q = multiprocessing.Queue()
        process_worker = ProcessWorker(task_q, result_q)
        worker = Worker(config, index, process_worker, task_q, result_q)
        something_to_look_at = pickle.dumps(worker) # FAIL:  Doesn't like queues??
        process_worker.start()
        worker.run_long_command("ls /")
4b9b3361

Ответ 1

Вместо того, чтобы пытаться отправить сам метод (что нецелесообразно), попробуйте отправить имя метода для выполнения.

При условии, что каждый рабочий запускает тот же код, это вопрос простого getattr(self, task_name).

Я бы передал кортежи (task_name, task_args), где task_args был dict, который напрямую передавался методу задачи:

next_task_name, next_task_args = self.task_q.get()
if next_task_name:
  task = getattr(self, next_task_name)
  answer = task(**next_task_args)
  ...
else:
  # poison pill, shut down
  break

Ответ 2

Итак, проблема заключалась в том, что я предполагал, что Python делает какую-то магию, которая чем-то отличается от того, как работает С++/fork(). Я почему-то думал, что Python только скопировал класс, а не всю программу в отдельный процесс. Я серьезно потратил впустую дни, пытаясь заставить это работать, потому что все разговоры о сериализации сеялки заставили меня подумать, что это действительно послало все по трубе. Я знал, что некоторые вещи не могут быть отправлены по трубе, но я думал, что моя проблема в том, что я не собирал вещи правильно.

Этого можно было избежать, если бы документы Python дали мне представление в 10 000 футов о том, что происходит, когда этот модуль используется. Конечно, это говорит мне, что делают методы многопроцессорного модуля, и дает мне некоторые базовые примеры, но то, что я хочу знать, - это то, что "Теория операции" за кулисами! Вот такая информация, которую я мог бы использовать. Пожалуйста, перезвоните, если мой ответ отключен. Это поможет мне учиться.

Когда вы запускаете процесс с использованием этого модуля, вся программа копируется в другой процесс. Но поскольку это не процесс "__main__", и мой код проверял это, он не запускает еще один процесс бесконечно. Он просто останавливается и сидит там, ожидая чего-то, как зомби. Все, что было инициализировано родителем во время вызова multiprocess.Process(), настроено и готово к работе. Как только вы помещаете что-то в multiprocess.Queue или shared memory, или pipe и т.д. (Однако вы общаетесь), тогда отдельный процесс получает его и начинает работать. Он может использовать все импортированные модули и настраиваться так же, как если бы он был родителем. Однако, как только некоторые внутренние переменные состояния изменяются в родительском или отдельном процессе, эти изменения изолированы. Как только процесс порождается, теперь становится вашей задачей сохранять синхронизацию, если необходимо, либо через очередь, канал, разделяемую память и т.д.

Я выкинул код и начал работу, но теперь я просто помещаю одну дополнительную функцию в метод ProcessWorker, "execute", который запускает командную строку. Довольно просто. Мне не нужно беспокоиться о запуске, а затем закрывать кучу процессов таким образом, что в прошлом вызывало у меня все проблемы нестабильности и производительности в С++. Когда я переключился на запуск процессов в начале и затем передал сообщения этим ожидающим процессам, моя производительность улучшилась, и она была очень стабильной.

Кстати, я просмотрел эту ссылку, чтобы получить помощь, которая отбросила меня, потому что пример заставил меня подумать, что методы переносятся по очереди: http://www.doughellmann.com/PyMOTW/multiprocessing/communication.html Второй пример первого раздела использовал "next_task()", который появился (для меня) для выполнения задачи, полученной через очередь.

Ответ 3

REF: fooobar.com/info/248281/...

Ответ на 6 января в 6:03 Дэвидом Линчем не является фактически правильным, когда он говорит, что он был введен в заблуждение http://www.doughellmann.com/PyMOTW/multiprocessing/communication.html.

Приведенный код и примеры являются правильными и работают как рекламируемые. next_task() выполняет задачу, полученную через очередь, - попытайтесь понять, что делает метод Task.__call__().

В моем случае, что, сбило меня с толку, было синтаксическими ошибками в моей реализации run(). Кажется, что подпроцесс не будет сообщать об этом и просто терпит неудачу - оставив вещи застрявшими в странных циклах! Убедитесь, что у вас есть какая-то проверка синтаксиса, например, Flymake/Pyflakes в Emacs.

Отладка через multiprocessing.log_to_stderr() F помогла мне сузить проблему.