Подтвердить что ты не робот

Python Multiprocessing: обработка ошибок для детей в родительском

В настоящее время я играю с многопроцессорностью и очередями. Я написал фрагмент кода для экспорта данных из mongoDB, сопоставил его в реляционную (плоскую) структуру, преобразовал все значения в строку и вставлял их в mysql.

Каждый из этих шагов представляется в виде процесса и задает очереди импорта/экспорта, безопасные для экспорта mongoDB, которые обрабатываются в родительском.

Как вы увидите ниже, я использую очереди, а дочерние процессы завершаются, когда они читают "Нет" из очереди. Проблема, которую я имею в настоящее время, заключается в том, что если дочерний процесс запускается в необработанное Exception, это не распознается родителем, а остальные просто продолжают работать. То, что я хочу, это то, что весь shebang завершает работу и, в лучшем случае, ререйзирует ошибку ребенка.

У меня есть два вопроса:

  • Как определить дочернюю ошибку в родительском?
  • Как убить мои дочерние процессы после обнаружения ошибки (наилучшей практики)? Я понимаю, что поставить "Нет" в очередь, чтобы убить ребенка, довольно грязно.

Я использую python 2.7.

Вот основные части моего кода:

# Establish communication queues
mongo_input_result_q = multiprocessing.Queue()
mapper_result_q = multiprocessing.Queue()
converter_result_q = multiprocessing.Queue()

[...]

    # create child processes
    # all processes generated here are subclasses of "multiprocessing.Process"

    # create mapper
    mappers = [mongo_relational_mapper.MongoRelationalMapper(mongo_input_result_q, mapper_result_q, columns, 1000)
               for i in range(10)]

    # create datatype converter, converts everything to str
    converters = [datatype_converter.DatatypeConverter(mapper_result_q, converter_result_q, 'str', 1000)
                  for i in range(10)]

    # create mysql writer
    # I create a list of writers. currently only one, 
    # but I have the option to parallellize it further
    writers = [mysql_inserter.MySqlWriter(mysql_host, mysql_user, mysql_passwd, mysql_schema, converter_result_q
               , columns, 'w_'+mysql_table, 1000) for i in range(1)]

    # starting mapper
    for mapper in mappers:
        mapper.start()
    time.sleep(1)

    # starting converter
    for converter in converters:
        converter.start()

    # starting writer
    for writer in writers:
        writer.start()

[... инициализация соединения mongo db...]

    # put each dataset read to queue for the mapper
    for row in mongo_collection.find({inc_column: {"$gte": start}}):
        mongo_input_result_q.put(row)
        count += 1
        if count % log_counter == 0:
            print 'Mongo Reader' + " " + str(count)
    print "MongoReader done"

    # Processes are terminated when they read "None" object from queue
    # now that reading is finished, put None for each mapper in the queue so they terminate themselves
    # the same for all followup processes
    for mapper in mappers:
        mongo_input_result_q.put(None)
    for mapper in mappers:
        mapper.join()
    for converter in converters:
        mapper_result_q.put(None)
    for converter in converters:
        converter.join()
    for writer in writers:
        converter_result_q.put(None)
    for writer in writers:
        writer.join()
4b9b3361

Ответ 1

Я не знаю стандартной практики, но я обнаружил, что для надежной многопроцессорности я разрабатываю методы/класс/и т.д. специально для работы с многопроцессорной обработкой. В противном случае вы никогда не знаете, что происходит с другой стороны (если я не пропустил какой-то механизм для этого).

В частности, я делаю это:

  • Подкласс multiprocessing.Process или создавать функции, которые специально поддерживают многопроцессорность (функции обертывания, которые вы не имеете контроля в случае необходимости)
  • всегда обеспечивает общую ошибку multiprocessing.Queue от основного процесса до каждого рабочего процесса.
  • заключить весь код запуска в try: ... except Exception as e. Затем, когда произойдет что-то неожиданное, отправьте пакет ошибок с:
    • идентификатор процесса, который умер
    • исключение с его исходным контекстом (здесь). Исходный контекст действительно важен, если вы хотите записывать полезную информацию в основной процесс.
  • конечно, обрабатывать ожидаемые проблемы как обычно при нормальной работе рабочего
  • (похоже на то, что вы сказали уже), предполагая длительный процесс, завершите текущий код (внутри try/catch-all) с помощью цикла
    • определить токен остановки в классе или для функций.
    • Когда основной процесс хочет, чтобы работник (ов) остановился, просто отправьте маркер остановки. чтобы остановить всех, отправить достаточно для всех процессов.
    • цикл упаковки проверяет вход q для токена или любой другой вход, который вы хотите

Конечным результатом являются рабочие процессы, которые могут длиться долгое время, и это может дать вам знать, что происходит, когда что-то идет не так. Они будут умирать спокойно, так как вы сможете справиться с тем, что вам нужно сделать после исключения, и вы также узнаете, когда вам нужно перезапустить рабочего.

Опять же, я только что пришел к этому шаблону с помощью проб и ошибок, поэтому я не знаю, насколько это стандартизировано. Помогает ли это с тем, что вы просите?

Ответ 2

Почему бы не позволить процессу заботиться о своих собственных исключениях, например:

import multiprocessing as mp
import traceback

class Process(mp.Process):
    def __init__(self, *args, **kwargs):
        mp.Process.__init__(self, *args, **kwargs)
        self._pconn, self._cconn = mp.Pipe()
        self._exception = None

    def run(self):
        try:
            mp.Process.run(self)
            self._cconn.send(None)
        except Exception as e:
            tb = traceback.format_exc()
            self._cconn.send((e, tb))
            # raise e  # You can still rise this exception if you need to

    @property
    def exception(self):
        if self._pconn.poll():
            self._exception = self._pconn.recv()
        return self._exception

Теперь у вас есть как ошибка, так и трассировка в ваших руках:

def target():
    raise ValueError('Something went wrong...')

p = Process(target = target)
p.start()
p.join()

if p.exception:
    error, traceback = p.exception
    print traceback

С уважением, Marek

Ответ 3

Благодаря kobejohn я нашел решение, которое хорошо и стабильно.

  • Я создал подкласс multiprocessing.Process, который реализует некоторые функции и перезаписывает метод run() для переноса нового метода saferun в блок try-catch. Для этого класса требуется инициализировать функцию обратной связи, которая используется для отправки информации, отладки, сообщений об ошибках родительской. Методы журнала в классе являются оболочками для глобально определенных функций журнала пакета:

    class EtlStepProcess(multiprocessing.Process):
    
    def __init__(self, feedback_queue):
        multiprocessing.Process.__init__(self)
        self.feedback_queue = feedback_queue
    
    def log_info(self, message):
        log_info(self.feedback_queue, message, self.name)
    
    def log_debug(self, message):
        log_debug(self.feedback_queue, message, self.name)
    
    def log_error(self, err):
        log_error(self.feedback_queue, err, self.name)
    
    def saferun(self):
        """Method to be run in sub-process; can be overridden in sub-class"""
        if self._target:
            self._target(*self._args, **self._kwargs)
    
    def run(self):
        try:
            self.saferun()
        except Exception as e:
            self.log_error(e)
            raise e
        return
    
  • Я подклассифицировал все мои другие шаги процесса из EtlStepProcess. Код, который должен быть запущен, реализуется в методе saferun(), а не выполняется. В этом случае мне не нужно добавлять вокруг него блок catch try, поскольку это уже выполняется методом run(). Пример:

    class MySqlWriter(EtlStepProcess):
    
    def __init__(self, mysql_host, mysql_user, mysql_passwd, mysql_schema, mysql_table, columns, commit_count,
                 input_queue, feedback_queue):
        EtlStepProcess.__init__(self, feedback_queue)
        self.mysql_host = mysql_host
        self.mysql_user = mysql_user
        self.mysql_passwd = mysql_passwd
        self.mysql_schema = mysql_schema
        self.mysql_table = mysql_table
        self.columns = columns
        self.commit_count = commit_count
        self.input_queue = input_queue
    
    def saferun(self):
        self.log_info(self.name + " started")
        #create mysql connection
        engine = sqlalchemy.create_engine('mysql://' + self.mysql_user + ':' + self.mysql_passwd + '@' + self.mysql_host + '/' + self.mysql_schema)
        meta = sqlalchemy.MetaData()
        table = sqlalchemy.Table(self.mysql_table, meta, autoload=True, autoload_with=engine)
        connection = engine.connect()
        try:
            self.log_info("start MySQL insert")
            counter = 0
            row_list = []
            while True:
                next_row = self.input_queue.get()
                if isinstance(next_row, Terminator):
                    if counter % self.commit_count != 0:
                        connection.execute(table.insert(), row_list)
                    # Poison pill means we should exit
                    break
                row_list.append(next_row)
                counter += 1
                if counter % self.commit_count == 0:
                    connection.execute(table.insert(), row_list)
                    del row_list[:]
                    self.log_debug(self.name + ' ' + str(counter))
    
        finally:
            connection.close()
        return
    
  • В моем основном файле я отправляю процесс, который выполняет всю работу, и даёт ему feedback_queue. Этот процесс запускает все этапы, а затем считывает из mongoDB и помещает значения в начальную очередь. Мой основной процесс прослушивает очередь обратной связи и печатает все сообщения журнала. Если он получает журнал ошибок, он печатает ошибку и завершает ее дочерний элемент, который в свою очередь также прекращает все свои дочерние элементы перед смертью.

    if __name__ == '__main__':
    feedback_q = multiprocessing.Queue()
    p = multiprocessing.Process(target=mongo_python_export, args=(feedback_q,))
    p.start()
    
    while p.is_alive():
        fb = feedback_q.get()
        if fb["type"] == "error":
            p.terminate()
            print "ERROR in " + fb["process"] + "\n"
            for child in multiprocessing.active_children():
                child.terminate()
        else:
            print datetime.datetime.fromtimestamp(fb["timestamp"]).strftime('%Y-%m-%d %H:%M:%S') + " " + \
                                                  fb["process"] + ": " + fb["message"]
    
    p.join()
    

Я думаю о том, чтобы сделать модуль из него и поставить его на github, но сначала мне нужно сначала очистить и комментировать.

Ответ 4

Решение @mrkwjc solution простое, его легко понять и реализовать, но у этого решения есть один недостаток. Когда у нас мало процессов, и мы хотим остановить все процессы, если какой-либо один процесс имеет ошибку, нам нужно подождать, пока все процессы не будут завершены, чтобы проверить, p.exception. Ниже приведен код, который устраняет эту проблему (т.е. когда один дочерний элемент имеет ошибку, мы прекращаем также другой дочерний элемент):

import multiprocessing
import traceback

from time import sleep


class Process(multiprocessing.Process):
    """
    Class which returns child Exceptions to Parent.
    /info/249845/python-multiprocessing-handling-child-errors-in-parent/1286169#1286169
    """

    def __init__(self, *args, **kwargs):
        multiprocessing.Process.__init__(self, *args, **kwargs)
        self._parent_conn, self._child_conn = multiprocessing.Pipe()
        self._exception = None

    def run(self):
        try:
            multiprocessing.Process.run(self)
            self._child_conn.send(None)
        except Exception as e:
            tb = traceback.format_exc()
            self._child_conn.send((e, tb))
            # raise e  # You can still rise this exception if you need to

    @property
    def exception(self):
        if self._parent_conn.poll():
            self._exception = self._parent_conn.recv()
        return self._exception


class Task_1:
    def do_something(self, queue):
        queue.put(dict(users=2))


class Task_2:
    def do_something(self, queue):
        queue.put(dict(users=5))


def main():
    try:
        task_1 = Task_1()
        task_2 = Task_2()

        # Example of multiprocessing which is used:
        # https://eli.thegreenplace.net/2012/01/16/python-parallelizing-cpu-bound-tasks-with-multiprocessing/
        task_1_queue = multiprocessing.Queue()
        task_2_queue = multiprocessing.Queue()

        task_1_process = Process(
            target=task_1.do_something,
            kwargs=dict(queue=task_1_queue))

        task_2_process = Process(
            target=task_2.do_something,
            kwargs=dict(queue=task_2_queue))

        task_1_process.start()
        task_2_process.start()

        while task_1_process.is_alive() or task_2_process.is_alive():
            sleep(10)

            if task_1_process.exception:
                error, task_1_traceback = task_1_process.exception

                # Do not wait until task_2 is finished
                task_2_process.terminate()

                raise ChildProcessError(task_1_traceback)

            if task_2_process.exception:
                error, task_2_traceback = task_2_process.exception

                # Do not wait until task_1 is finished
                task_1_process.terminate()

                raise ChildProcessError(task_2_traceback)

        task_1_process.join()
        task_2_process.join()

        task_1_results = task_1_queue.get()
        task_2_results = task_2_queue.get()

        task_1_users = task_1_results['users']
        task_2_users = task_2_results['users']

    except Exception:
        # Here usually I send email notification with error.
        print('traceback:', traceback.format_exc())


if __name__ == "__main__":
    main()