Я пытаюсь создать класс, который может запускать отдельный процесс, чтобы выполнить некоторую работу, которая занимает много времени, запустить кучу из основного модуля, а затем дождаться их завершения. Я хочу запускать процессы один раз, а затем продолжать кормить их, а не создавать и уничтожать процессы. Например, возможно, у меня есть 10 серверов, на которых запущена команда dd, тогда я хочу, чтобы все они scp файл и т.д.
Моя конечная цель - создать класс для каждой системы, который отслеживает информацию для системы, в которой она привязана как к IP-адресу, журналам, времени выполнения и т.д. Но этот класс должен иметь возможность запускать системную команду а затем вернуть выполнение обратно вызывающему абоненту во время выполнения этой системной команды, чтобы выполнить последующий результат с помощью системной команды позже.
Моя попытка не работает, потому что я не могу отправить метод экземпляра класса по каналу в подпроцесс через pickle. Они не разборчивы. Поэтому я попытался исправить это различными способами, но я не могу понять это. Как мой код может быть исправлен, чтобы сделать это? Какая польза от многопроцессорности, если вы не можете отправить что-нибудь полезное?
Есть ли хорошая документация по многопроцессорной обработке, используемой с экземплярами класса? Единственный способ заставить мультипроцессорный модуль работать на простых функциях. Каждая попытка использовать его в экземпляре класса не удалась. Может быть, я должен передать события? Я еще не понимаю, как это сделать.
import multiprocessing
import sys
import re
class ProcessWorker(multiprocessing.Process):
"""
This class runs as a separate process to execute worker commands in parallel
Once launched, it remains running, monitoring the task queue, until "None" is sent
"""
def __init__(self, task_q, result_q):
multiprocessing.Process.__init__(self)
self.task_q = task_q
self.result_q = result_q
return
def run(self):
"""
Overloaded function provided by multiprocessing.Process. Called upon start() signal
"""
proc_name = self.name
print '%s: Launched' % (proc_name)
while True:
next_task_list = self.task_q.get()
if next_task is None:
# Poison pill means shutdown
print '%s: Exiting' % (proc_name)
self.task_q.task_done()
break
next_task = next_task_list[0]
print '%s: %s' % (proc_name, next_task)
args = next_task_list[1]
kwargs = next_task_list[2]
answer = next_task(*args, **kwargs)
self.task_q.task_done()
self.result_q.put(answer)
return
# End of ProcessWorker class
class Worker(object):
"""
Launches a child process to run commands from derived classes in separate processes,
which sit and listen for something to do
This base class is called by each derived worker
"""
def __init__(self, config, index=None):
self.config = config
self.index = index
# Launce the ProcessWorker for anything that has an index value
if self.index is not None:
self.task_q = multiprocessing.JoinableQueue()
self.result_q = multiprocessing.Queue()
self.process_worker = ProcessWorker(self.task_q, self.result_q)
self.process_worker.start()
print "Got here"
# Process should be running and listening for functions to execute
return
def enqueue_process(target): # No self, since it is a decorator
"""
Used to place an command target from this class object into the task_q
NOTE: Any function decorated with this must use fetch_results() to get the
target task result value
"""
def wrapper(self, *args, **kwargs):
self.task_q.put([target, args, kwargs]) # FAIL: target is a class instance method and can't be pickled!
return wrapper
def fetch_results(self):
"""
After all processes have been spawned by multiple modules, this command
is called on each one to retreive the results of the call.
This blocks until the execution of the item in the queue is complete
"""
self.task_q.join() # Wait for it to to finish
return self.result_q.get() # Return the result
@enqueue_process
def run_long_command(self, command):
print "I am running number % as process "%number, self.name
# In here, I will launch a subprocess to run a long-running system command
# p = Popen(command), etc
# p.wait(), etc
return
def close(self):
self.task_q.put(None)
self.task_q.join()
if __name__ == '__main__':
config = ["some value", "something else"]
index = 7
workers = []
for i in range(5):
worker = Worker(config, index)
worker.run_long_command("ls /")
workers.append(worker)
for worker in workers:
worker.fetch_results()
# Do more work... (this would actually be done in a distributor in another class)
for worker in workers:
worker.close()
Изменить: я попытался переместить класс ProcessWorker
и создать очереди многопроцессорности вне класса Worker
, а затем попытался вручную рассортировать рабочий экземпляр. Даже это не работает, и я получаю сообщение об ошибке
RuntimeError: объекты очереди должны делиться только между процессами через наследование
. Но я только передаю ссылки этих очередей в рабочий экземпляр? Мне не хватает чего-то фундаментального. Вот модифицированный код из основного раздела:
if __name__ == '__main__':
config = ["some value", "something else"]
index = 7
workers = []
for i in range(1):
task_q = multiprocessing.JoinableQueue()
result_q = multiprocessing.Queue()
process_worker = ProcessWorker(task_q, result_q)
worker = Worker(config, index, process_worker, task_q, result_q)
something_to_look_at = pickle.dumps(worker) # FAIL: Doesn't like queues??
process_worker.start()
worker.run_long_command("ls /")