Подтвердить что ты не робот

Пул многопроцессорности Python висит при соединении?

Я пытаюсь запустить несколько python-кода на нескольких файлах параллельно. Конструкция в основном:

def process_file(filename, foo, bar, baz=biz):
    # do stuff that may fail and cause exception

if __name__ == '__main__':
    # setup code setting parameters foo, bar, and biz

    psize = multiprocessing.cpu_count()*2
    pool = multiprocessing.Pool(processes=psize)

    map(lambda x: pool.apply_async(process_file, (x, foo, bar), dict(baz=biz)), sys.argv[1:])
    pool.close()
    pool.join()

Я ранее использовал pool.map, чтобы сделать что-то подобное, и он отлично поработал, но я не могу использовать это здесь, потому что pool.map не (кажется) не позволяет мне передавать дополнительные аргументы (и использование лямбда для этого не будет работать, потому что лямбда не может быть распределена).

Итак, теперь я пытаюсь заставить работу работать с помощью apply_async() напрямую. Моя проблема в том, что код, кажется, висит и никогда не выходит. Некоторые из файлов сбой исключаются, но я не понимаю, почему это может привести к сбою/зависанию соединения? Интересно, что если ни один из файлов не завершится с исключением, он действительно завершит работу.

Что мне не хватает?

Редактировать: Когда функция (и, следовательно, рабочий) терпит неудачу, я вижу это исключение:

Exception in thread Thread-3:
Traceback (most recent call last):
  File "/usr/lib/python2.7/threading.py", line 552, in __bootstrap_inner
    self.run()
  File "/usr/lib/python2.7/threading.py", line 505, in run
    self.__target(*self.__args, **self.__kwargs)
  File "/usr/lib/python2.7/multiprocessing/pool.py", line 376, in _handle_results
    task = get()
TypeError: ('__init__() takes at least 3 arguments (1 given)', <class 'subprocess.CalledProcessError'>, ())

Если я вижу хотя бы один из них, процесс родительского процесса зависает вечно, не пожимая детей и не выходя из него.

4b9b3361

Ответ 1

Извините, что ответил на мой собственный вопрос, но я нашел хотя бы обходной путь, поэтому, если у кого-то еще есть аналогичная проблема, я хочу опубликовать его здесь. Я приму любые лучшие ответы там.

Я считаю, что корень проблемы - http://bugs.python.org/issue9400. Это говорит мне две вещи:

  • Я не сумасшедший, то, что я пытаюсь сделать, должен работать
  • По крайней мере, в python2 очень сложно, если не невозможно, заманить "исключения" обратно в родительский процесс. Простые работают, но многие другие не делают.

В моем случае моя рабочая функция запускала подпроцесс, который был segfault. Это возвратило исключение CalledProcessError, которое не является подходящим. По какой-то причине это приводит к тому, что объект пула в родителе выходит на обед и не возвращается из вызова join().

В моем конкретном случае мне все равно, что такое исключение. В лучшем случае я хочу зарегистрировать его и продолжить. Чтобы сделать это, я просто обертываю свою верхнюю рабочую функцию в предложение try/except. Если работник бросает какое-либо исключение, он улавливается, прежде чем пытаться вернуться к родительскому процессу, войти в систему, а затем рабочий процесс завершится нормально, поскольку он больше не пытается отправить исключение через. См. Ниже:

def process_file_wrapped(filenamen, foo, bar, baz=biz):
    try:
        process_file(filename, foo, bar, baz=biz)
    except:
        print('%s: %s' % (filename, traceback.format_exc()))

Затем у меня есть моя начальная функция вызова карты process_file_wrapped() вместо первоначальной. Теперь мой код работает по назначению.

Ответ 2

Фактически вы можете использовать экземпляр functools.partial вместо lambda в тех случаях, когда объект нужно мариновать. Объекты partial выбираются с Python 2.7 (и в Python 3).

pool.map(functools.partial(process_file, x, foo, bar, baz=biz), sys.argv[1:])

Ответ 3

Для чего это стоит, у меня была подобная ошибка (не та же), когда pool.map висел. Мой случай разрешил мне использовать pool.terminate, чтобы решить проблему (убедитесь, что ваша делает так же, прежде чем менять материал).

Я использовал pool.map перед вызовом terminate, поэтому я знаю, что все закончилось, из docs:

Параллельный эквивалент встроенной функции map() (хотя он поддерживает только один итеративный аргумент). Он блокируется до тех пор, пока результат не будет готов.

Если это ваш вариант использования, это может быть способ его исправления.