Подтвердить что ты не робот

Исключение, вызванное многопроцессорностью Пул не обнаружен

Похоже, что когда возникает исключение из процесса multiprocessing.Pool, нет трассировки стека или каких-либо других признаков того, что она потерпела неудачу. Пример:

from multiprocessing import Pool 

def go():
    print(1)
    raise Exception()
    print(2)

p = Pool()
p.apply_async(go)
p.close()
p.join()

печатает 1 и останавливается молча. Интересно, что вместо этого работает BaseException. Есть ли способ сделать поведение для всех исключений таким же, как BaseException?

4b9b3361

Ответ 1

У меня есть разумное решение проблемы, по крайней мере для целей отладки. В настоящее время у меня нет решения, которое восстановят исключение в основных процессах. Моя первая мысль заключалась в том, чтобы использовать декоратор, но вы можете только раскрыть функции, определенные на верхнем уровне модуля, так что прямо.

Вместо этого простой класс упаковки и подкласс Pool, который использует это для apply_async (и, следовательно, apply). Я оставлю map_async как упражнение для читателя.

import traceback
from multiprocessing.pool import Pool
import multiprocessing

# Shortcut to multiprocessing logger
def error(msg, *args):
    return multiprocessing.get_logger().error(msg, *args)

class LogExceptions(object):
    def __init__(self, callable):
        self.__callable = callable

    def __call__(self, *args, **kwargs):
        try:
            result = self.__callable(*args, **kwargs)

        except Exception as e:
            # Here we add some debugging help. If multiprocessing's
            # debugging is on, it will arrange to log the traceback
            error(traceback.format_exc())
            # Re-raise the original exception so the Pool worker can
            # clean up
            raise

        # It was fine, give a normal answer
        return result

class LoggingPool(Pool):
    def apply_async(self, func, args=(), kwds={}, callback=None):
        return Pool.apply_async(self, LogExceptions(func), args, kwds, callback)

def go():
    print(1)
    raise Exception()
    print(2)

multiprocessing.log_to_stderr()
p = LoggingPool(processes=1)

p.apply_async(go)
p.close()
p.join()

Это дает мне:

1
[ERROR/PoolWorker-1] Traceback (most recent call last):
  File "mpdebug.py", line 24, in __call__
    result = self.__callable(*args, **kwargs)
  File "mpdebug.py", line 44, in go
    raise Exception()
Exception

Ответ 2

Возможно, я что-то упустил, но не так ли, что возвращает метод get объекта Result? См. Пулы процессов.

класс multiprocessing.pool.AsyncResult

Класс результата, возвращаемого Pool.apply_async() и Pool.map_async(). get ([timeout])
Верните результат, когда он придет. Если тайм-аут не является ничем, и результат не приходит таймаут секунд, а затем многопроцессорность .TimeoutError поднят. Если пульт дистанционного управления вызов вызывает исключение, тогда это исключение будет ререйзироваться с помощью get().

Итак, слегка изменив ваш пример, можно сделать

from multiprocessing import Pool

def go():
    print(1)
    raise Exception("foobar")
    print(2)

p = Pool()
x = p.apply_async(go)
x.get()
p.close()
p.join()

Что дает результат

1
Traceback (most recent call last):
  File "rob.py", line 10, in <module>
    x.get()
  File "/usr/lib/python2.6/multiprocessing/pool.py", line 422, in get
    raise self._value
Exception: foobar

Это не вполне удовлетворительно, так как он не печатает трассировку, но лучше, чем ничего.

UPDATE: эта ошибка была исправлена ​​в Python 3.4, любезно предоставлена ​​Ричардом Оудкерком. См. Вопрос получить метод multiprocessing.pool.Async должен вернуть полную трассировку.

Ответ 3

Решение с большинством голосов на момент написания статьи имеет проблему:

from multiprocessing import Pool

def go():
    print(1)
    raise Exception("foobar")
    print(2)

p = Pool()
x = p.apply_async(go)
x.get()  ## waiting here for go() to complete...
p.close()
p.join()

Как отметил @dfrankow, он будет ждать x.get(), что разрушает точку запуска задачи асинхронно. Таким образом, для повышения эффективности (особенно если ваша рабочая функция go занимает много времени), я бы изменил ее на:

from multiprocessing import Pool

def go(x):
    print(1)
    # task_that_takes_a_long_time()
    raise Exception("Can't go anywhere.")
    print(2)
    return x**2

p = Pool()
results = []
for x in range(1000):
    results.append( p.apply_async(go, [x]) )

p.close()

for r in results:
     r.get()

Преимущества: рабочая функция запускается асинхронно, поэтому, если, например, вы запускаете много задач на нескольких ядрах, это будет намного более эффективно, чем исходное решение.

Недостатки: если в рабочей функции есть исключение, она будет только поднята после, пул выполнит все задачи. Это может быть или не быть желательным. EDITED в соответствии с комментарием @colinfang, который исправил это.

Ответ 4

У меня были успешные протоколирующие исключения с этим декоратором:

import traceback, functools, multiprocessing

def trace_unhandled_exceptions(func):
    @functools.wraps(func)
    def wrapped_func(*args, **kwargs):
        try:
            func(*args, **kwargs)
        except:
            print 'Exception in '+func.__name__
            traceback.print_exc()
    return wrapped_func

с кодом в вопросе, это

@trace_unhandled_exceptions
def go():
    print(1)
    raise Exception()
    print(2)

p = multiprocessing.Pool(1)

p.apply_async(go)
p.close()
p.join()

Просто украсьте функцию, которую вы передаете в пул процессов. Ключом к этой работе является @functools.wraps(func), иначе многопроцессор выдает a PicklingError.

приведенный выше код дает

1
Exception in go
Traceback (most recent call last):
  File "<stdin>", line 5, in wrapped_func
  File "<stdin>", line 4, in go
Exception

Ответ 5

Я создал модуль RemoteException.py, который показывает полную отслеживание исключения в процессе. Python2. Скачайте и добавьте это в свой код:

import RemoteException

@RemoteException.showError
def go():
    raise Exception('Error!')

if __name__ == '__main__':
    import multiprocessing
    p = multiprocessing.Pool(processes = 1)
    r = p.apply(go) # full traceback is shown here

Ответ 6

import logging
from multiprocessing import Pool

def proc_wrapper(func, *args, **kwargs):
    """Print exception because multiprocessing lib doesn't return them right."""
    try:
        return func(*args, **kwargs)
    except Exception as e:
        logging.exception(e)
        raise

def go(x):
    print x
    raise Exception("foobar")

p = Pool()
p.apply_async(proc_wrapper, (go, 5))
p.join()
p.close()

Ответ 7

Я бы попытался использовать pdb:

import pdb
import sys
def handler(type, value, tb):
  pdb.pm()
sys.excepthook = handler

Ответ 8

Поскольку вы использовали apply_sync, я предполагаю, что в случае использования требуется выполнить некоторые задачи синхронизации. Использовать обратный вызов для обработки - это еще один вариант. Обратите внимание, что этот параметр доступен только для python3.2 и выше и недоступен на python2.7.

from multiprocessing import Pool

def callback(result):
    print('success', result)

def callback_error(result):
    print('error', result)

def go():
    print(1)
    raise Exception()
    print(2)

p = Pool()
p.apply_async(go, callback=callback, error_callback=callback_error)

# You can do another things

p.close()
p.join()

Ответ 9

Поскольку для multiprocessing.Pool уже имеются достойные ответы, я предоставил решение с использованием другого подхода для полноты.

Для python >= 3.2 следующее решение кажется самым простым:

from concurrent.futures import ProcessPoolExecutor, wait

def go():
    print(1)
    raise Exception()
    print(2)


futures = []
with ProcessPoolExecutor() as p:
    for i in range(10):
        futures.append(p.submit(go))

results = [f.result() for f in futures]

Преимущества:

  • очень маленький код
  • вызывает исключение в основном процессе
  • предоставляет трассировку стека
  • нет внешних зависимостей

Для получения дополнительной информации об API, пожалуйста, проверьте: https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.ProcessPoolExecutor

Кроме того, если вы отправляете большое количество задач, и вы хотите, чтобы ваш основной процесс завершился с ошибкой, как только одна из ваших задач выйдет из строя, вы можете использовать следующий фрагмент:

from concurrent.futures import ProcessPoolExecutor, wait, FIRST_EXCEPTION, as_completed
import time


def go():
    print(1)
    time.sleep(0.3)
    raise Exception()
    print(2)


futures = []
with ProcessPoolExecutor(1) as p:
    for i in range(10):
        futures.append(p.submit(go))

    for f in as_completed(futures):
        if f.exception() is not None:
            for f in futures:
                f.cancel()
            break

[f.result() for f in futures]

Все остальные ответы завершаются только после выполнения всех задач.