Подтвердить что ты не робот

Пул потоков Python, который обрабатывает исключения

Я искал хорошую реализацию простого пула пула пула python и действительно не могу найти ничего, что бы соответствовало моим потребностям. Я использую python 2.7, и все модули, которые я нашел, либо не работают, либо не обрабатывают исключения в рабочих правильно. Мне было интересно, знал ли кто-нибудь о библиотеке, которая могла бы предложить тип функциональности, которую я ищу. Помогите с благодарностью.

Multiprocessing

Моя первая попытка заключалась в встроенном модуле multiprocessing, но поскольку это не использует потоки, а подпроцессы, вместо этого мы сталкиваемся с проблемой, что объекты не могут быть маринованными. Не идите сюда.

from multiprocessing import Pool

class Sample(object):
    def compute_fib(self, n):
        phi = (1 + 5**0.5) / 2
        self.fib = int(round((phi**n - (1-phi)**n) / 5**0.5))

samples = [Sample() for i in range(8)]
pool = Pool(processes=8)
for s in samples: pool.apply_async(s.compute_fib, [20])
pool.join()
for s in samples: print s.fib

# PicklingError: Can't pickle <type 'instancemethod'>: attribute lookup __builtin__.instancemethod failed

Фьючерс

Итак, я вижу, что есть обратный порт некоторых из крутых параллельных функций python 3.2 здесь. Это кажется идеальным и простым в использовании. Проблема в том, что когда вы получаете исключение в одном из рабочих, вы получаете только тип исключения, такого как "ZeroDivisionError", но не трассируетесь и, следовательно, не указали, какая строка вызвала исключение. Код становится невозможным для отладки. Нет.

from concurrent import futures

class Sample(object):
    def compute_fib(self, n):
        phi = (1 + 5**0.5) / 2
        1/0
        self.fib = int(round((phi**n - (1-phi)**n) / 5**0.5))

samples = [Sample() for i in range(8)]
pool = futures.ThreadPoolExecutor(max_workers=8)
threads = [pool.submit(s.compute_fib, 20) for s in samples]
futures.wait(threads, return_when=futures.FIRST_EXCEPTION)
for t in threads: t.result()
for s in samples: print s.fib


#    futures-2.1.3-py2.7.egg/concurrent/futures/_base.pyc in __get_result(self)
#    354     def __get_result(self):
#    355         if self._exception:
#--> 356             raise self._exception
#    357         else:
#    358             return self._result
#
# ZeroDivisionError: integer division or modulo by zero

Workerpool

Я нашел другую реализацию этого шаблона здесь. На этот раз, когда возникает исключение, оно печатается, но тогда мой интерактивный интерпретатор ipython остается в зависании и его нужно убить из другой оболочки. Нет.

import workerpool

class Sample(object):
    def compute_fib(self, n):
        phi = (1 + 5**0.5) / 2
        1/0
        self.fib = int(round((phi**n - (1-phi)**n) / 5**0.5))

samples = [Sample() for i in range(8)]
pool = workerpool.WorkerPool(size=8)
for s in samples: pool.map(s.compute_fib, [20])
pool.wait()
for s in samples: print s.fib

# ZeroDivisionError: integer division or modulo by zero
# ^C^C^C^C^C^C^C^C^D^D
# $ kill 1783

Threadpool

Еще одна реализация здесь. На этот раз, когда возникает исключение, оно печатается на stderr, но script не прерывается и вместо этого продолжает выполнение, что не соответствует цели исключения и может сделать вещи небезопасными. Все еще не используется.

import threadpool

class Sample(object):
    def compute_fib(self, n):
        phi = (1 + 5**0.5) / 2
        1/0
        self.fib = int(round((phi**n - (1-phi)**n) / 5**0.5))

samples = [Sample() for i in range(8)]
pool = threadpool.ThreadPool(8)
requests = [threadpool.makeRequests(s.compute_fib, [20]) for s in samples]
requests = [y for x in requests for y in x]
for r in requests: pool.putRequest(r)
pool.wait()
for s in samples: print s.fib

# ZeroDivisionError: integer division or modulo by zero
# ZeroDivisionError: integer division or modulo by zero
# ZeroDivisionError: integer division or modulo by zero
# ZeroDivisionError: integer division or modulo by zero
# ZeroDivisionError: integer division or modulo by zero
# ZeroDivisionError: integer division or modulo by zero
# ZeroDivisionError: integer division or modulo by zero
# ZeroDivisionError: integer division or modulo by zero
#---> 17 for s in samples: print s.fib
#
#AttributeError: 'Sample' object has no attribute 'fib'

- Обновление -

Похоже, что в отношении библиотеки futures поведение python 3 не совпадает с python 2.

futures_exceptions.py:

from concurrent.futures import ThreadPoolExecutor, as_completed

def div_zero(x):
    return x / 0

with ThreadPoolExecutor(max_workers=4) as executor:
    futures = executor.map(div_zero, range(4))
    for future in as_completed(futures): print(future)

Выход Python 2.7.6:

Traceback (most recent call last):
  File "...futures_exceptions.py", line 12, in <module>
    for future in as_completed(futures):
  File "...python2.7/site-packages/concurrent/futures/_base.py", line 198, in as_completed
    with _AcquireFutures(fs):
  File "...python2.7/site-packages/concurrent/futures/_base.py", line 147, in __init__
    self.futures = sorted(futures, key=id)
  File "...python2.7/site-packages/concurrent/futures/_base.py", line 549, in map
    yield future.result()
  File "...python2.7/site-packages/concurrent/futures/_base.py", line 397, in result
    return self.__get_result()
  File "...python2.7/site-packages/concurrent/futures/_base.py", line 356, in __get_result
    raise self._exception
ZeroDivisionError: integer division or modulo by zero

Выход Python 3.3.2:

Traceback (most recent call last):
  File "...futures_exceptions.py", line 11, in <module>
    for future in as_completed(futures):
  File "...python3.3/concurrent/futures/_base.py", line 193, in as_completed
    with _AcquireFutures(fs):
  File "...python3.3/concurrent/futures/_base.py", line 142, in __init__
    self.futures = sorted(futures, key=id)
  File "...python3.3/concurrent/futures/_base.py", line 546, in result_iterator
    yield future.result()
  File "...python3.3/concurrent/futures/_base.py", line 392, in result
    return self.__get_result()
  File "...python3.3/concurrent/futures/_base.py", line 351, in __get_result
    raise self._exception
  File "...python3.3/concurrent/futures/thread.py", line 54, in run
    result = self.fn(*self.args, **self.kwargs)
  File "...futures_exceptions.py", line 7, in div_zero
    return x / 0
ZeroDivisionError: division by zero
4b9b3361

Ответ 2

Простое решение: используйте любую альтернативу, которая вам подходит, и реализуйте свой собственный блок try-except для ваших работников. Если вы хотите, вызовите корневой вызов.

Я бы не сказал, что эти библиотеки обрабатывают исключения "неправильно". Они имеют поведение по умолчанию, но примитивное. Вы должны будете справиться с этим сами, если по умолчанию вас не устраивает.