Подтвердить что ты не робот

Multiprocessing.Pool зависает, если дочерний элемент вызывает ошибку сегментации

Я хочу применить функцию параллельно с использованием multiprocessing.Pool. Проблема в том, что если один вызов функции вызывает ошибку сегментации, пул вешает навсегда. Кто-нибудь знает, как я могу создать пул, который обнаруживает, когда что-то подобное происходит и вызывает ошибку?

В следующем примере показано, как воспроизвести его (требуется scikit-learn > 0.14)

import numpy as np
from sklearn.ensemble import gradient_boosting
import time

from multiprocessing import Pool

class Bad(object):
    tree_ = None


def fit_one(i):
    if i == 3:
        # this will segfault                                                    
        bad = np.array([[Bad()] * 2], dtype=np.object)
        gradient_boosting.predict_stages(bad,
                                         np.random.rand(20, 2).astype(np.float32),
                                         1.0, np.random.rand(20, 2))
    else:
        time.sleep(1)
    return i


pool = Pool(2)
out = pool.imap_unordered(fit_one, range(10))
# we will never see 3
for o in out:
    print o
4b9b3361

Ответ 1

Вместо использования Pool().imap() возможно, вам лучше вручную создать дочерние процессы с помощью Process(). Я уверен, что возвращенный объект позволит вам получить статус жизни любого ребенка. Вы узнаете, вешают ли они.

Ответ 2

Это известная ошибка, проблема # 22393, на Python. Нет значимого обходного пути, пока вы используете multiprocessing.pool до тех пор, пока он не будет исправлен. Патч доступен по этой ссылке, но он еще не интегрирован в основную версию, поэтому стабильная версия Python не устраняет проблему.

Ответ 3

Я не запускаю ваш пример, чтобы узнать, может ли он обрабатывать эту ошибку, но попытайтесь выполнить параллельные фьючерсы. Просто замените my_function (i) вашей fit_one (i). Сохраните структуру __name__=='__main__':. Кажется, что это сопряжено с параллельными фьючерсами. Код ниже проверяется на моей машине, поэтому мы надеемся, что работаем прямо на вашем компьютере.

import concurrent.futures

def my_function(i):
    print('function running')
    return i

def run():
    number_processes=4
    executor = concurrent.futures.ProcessPoolExecutor(number_processes)
    futures = [executor.submit(my_function,i) for i in range(10)]
    concurrent.futures.wait(futures)

    for f in futures:
        print(f.result())

if __name__ == '__main__':
    run()

Ответ 4

Как описано в комментариях, это просто работает в Python 3, если вы используете concurrent.Futures.ProcessPoolExecutor вместо multiprocessing.Pool.

Если вы застряли на Python 2, лучшим вариантом, который я нашел, является использование аргумента timeout для объектов результатов, возвращаемых Pool.apply_async и Pool.map_async. Например:

pool = Pool(2)
out = pool.map_async(fit_one, range(10))
for o in out:
    print o.get(timeout=1000)  # allow 1000 seconds max

Это работает до тех пор, пока у вас есть верхняя граница того, как долго должен выполняться дочерний процесс для выполнения задачи.