Подтвердить что ты не робот

Python 3: обнаружение предупреждений при многопроцессорной обработке

Слишком долго; не читал

warnings.catch_warnings() контекстный менеджер не потокобезопасен. Как использовать его в среде параллельной обработки?

Фон

В приведенном ниже коде решается проблема максимизации с использованием параллельной обработки с помощью модуля Python multiprocessing. Он принимает список (неизменных) виджетов, разбивает их на разделы (см. Эффективная многопроцессорность массивной максимизации грубой силы в Python 3), находит максимумы ( "финалисты" ) из всех разделов, а затем находит максимум ( "чемпион" ) этих "финалистов". Если я правильно понимаю свой собственный код (и меня бы здесь не было, если бы я это сделал), я делюсь памятью со всеми дочерними процессами, чтобы предоставить им входные виджеты, а multiprocessing использует трубку уровня операционной системы и маринование, чтобы отправить финалистские виджеты обратно в основной процесс, когда рабочие сделаны.

Источник проблемы

Я хочу уловить избыточные предупреждения виджетов, вызванные повторной инстанцией виджетов после рассыпания, которое происходит, когда виджеты выходят из межпроцессного канала. Когда объекты виджета создают экземпляр, они проверяют свои собственные данные, испуская предупреждения из стандартного модуля warnings Python, чтобы сообщить пользователю приложения, что виджет подозревает, что есть проблема с входными данными пользователя. Поскольку unpickling заставляет объекты создавать экземпляр, мое понимание кода подразумевает, что каждый объект виджета реинсталлируется ровно один раз, если и только если он является финалистом после того, как он выходит из канала - см. Следующий раздел, чтобы понять, почему это неверно.

Виджеты уже были созданы до того, как они были frobnicated, поэтому пользователь уже мучительно осознает, какой вклад он получил неправильно, и не хочет снова слышать об этом. Это предупреждения, которые я хотел бы уловить с помощью менеджера контекста warnings module catch_warnings() (т.е. Инструкции with).

Неудавшиеся решения

В моих тестах я сузился, когда лишние предупреждения издаются в любом месте между тем, что я обозначил ниже как линия А и строка В. Меня удивляет то, что предупреждения высылаются в местах, отличных от ближайших output_queue.get(). Это означает, что multiprocessing отправляет виджеты рабочим, использующим травление.

Результатом является то, что создание менеджера контекста, созданного warnings.catch_warnings() даже вокруг всего, от линии А до строки В и установки правильных предупреждений фильтр внутри этого контекста не улавливает предупреждения. Это означает, что предупреждения производятся в рабочих процессах. Помещение этого менеджера контекста вокруг рабочего кода также не вызывает предупреждений.

Код

В этом примере отсутствует код для определения того, является ли размер проблемы слишком малым, чтобы беспокоиться о процессах forking, импортировать многопроцессорность и определять my_frobnal_counter и my_load_balancer.

"Call `frobnicate(list_of_widgets)` to get the widget with the most frobnals"

def frobnicate_parallel_worker(widgets, output_queue):
    resultant_widget = max(widgets, key=my_frobnal_counter)
    output_queue.put(resultant_widget)

def frobnicate_parallel(widgets):
    output_queue = multiprocessing.Queue()
    # partitions: Generator yielding tuples of sets
    partitions = my_load_balancer(widgets)
    processes = []
    # Line A: Possible start of where the warnings are coming from.
    for partition in partitions:
        p = multiprocessing.Process(
                 target=frobnicate_parallel_worker,
                 args=(partition, output_queue))
        processes.append(p)
        p.start()
    finalists = []
    for p in processes:
        finalists.append(output_queue.get())
    # Avoid deadlocks in Unix by draining queue before joining processes
    for p in processes:
        p.join()
    # Line B: Warnings no longer possible after here.
    return max(finalists, key=my_frobnal_counter)
4b9b3361

Ответ 1

вы можете попробовать переопределить метод Process.run для использования warnings.catch_warnings.

>>> from multiprocessing import Process
>>> 
>>> def yell(text):
...    import warnings
...    print 'about to yell %s' % text
...    warnings.warn(text)
... 
>>> class CustomProcess(Process):
...    def run(self, *args, **kwargs):
...       import warnings
...       with warnings.catch_warnings():
...          warnings.simplefilter("ignore")
...          return Process.run(self, *args, **kwargs)
... 
>>> if __name__ == '__main__':
...    quiet = CustomProcess(target=yell, args=('...not!',))
...    quiet.start()
...    quiet.join()
...    noisy = Process(target=yell, args=('AAAAAAaaa!',))
...    noisy.start()
...    noisy.join()
... 
about to yell ...not!
about to yell AAAAAAaaa!
__main__:4: UserWarning: AAAAAAaaa!
>>> 

или вы можете использовать некоторые внутренние элементы... (__warningregistry__)

>>> from multiprocessing import Process
>>> import exceptions
>>> def yell(text):
...    import warnings
...    print 'about to yell %s' % text
...    warnings.warn(text)
...    # not filtered
...    warnings.warn('complimentary second warning.')
... 
>>> WARNING_TEXT = 'AAAAaaaaa!'
>>> WARNING_TYPE = exceptions.UserWarning
>>> WARNING_LINE = 4
>>> 
>>> class SelectiveProcess(Process):
...    def run(self, *args, **kwargs):
...       registry = globals().setdefault('__warningregistry__', {})
...       registry[(WARNING_TEXT, WARNING_TYPE, WARNING_LINE)] = True
...       return Process.run(self, *args, **kwargs)
... 
>>> if __name__ == '__main__':
...    p = SelectiveProcess(target=yell, args=(WARNING_TEXT,))
...    p.start()
...    p.join()
... 
about to yell AAAAaaaaa!
__main__:6: UserWarning: complimentary second warning.
>>> 

Ответ 2

Распаковка не приведет к тому, что __init__ будет выполняться дважды. Я запускал следующий код в Windows, и этого не происходит (каждый __init__ запускается ровно один раз).

Следовательно, вам необходимо предоставить нам код из my_load_balancer и класса widgets. На данный момент ваш вопрос просто не предоставляет достаточной информации.

Как случайное предположение, вы можете проверить, будет ли my_load_balancer делать копии виджетов, заставляя их снова создавать экземпляр.

import multiprocessing
import collections

"Call `frobnicate(list_of_widgets)` to get the widget with the most frobnals"

def my_load_balancer(widgets):
    partitions = tuple(set() for _ in range(8))
    for i, widget in enumerate(widgets):
        partitions[i % 8].add(widget)
    for partition in partitions:
        yield partition

def my_frobnal_counter(widget):
    return widget.id

def frobnicate_parallel_worker(widgets, output_queue):
    resultant_widget = max(widgets, key=my_frobnal_counter)
    output_queue.put(resultant_widget)

def frobnicate_parallel(widgets):
    output_queue = multiprocessing.Queue()
    # partitions: Generator yielding tuples of sets
    partitions = my_load_balancer(widgets)
    processes = []
    # Line A: Possible start of where the warnings are coming from.
    for partition in partitions:
        p = multiprocessing.Process(
                 target=frobnicate_parallel_worker,
                 args=(partition, output_queue))
        processes.append(p)
        p.start()
    finalists = []
    for p in processes:
        finalists.append(output_queue.get())
    # Avoid deadlocks in Unix by draining queue before joining processes
    for p in processes:
        p.join()
    # Line B: Warnings no longer possible after here.
    return max(finalists, key=my_frobnal_counter)

class Widget:
    id = 0
    def __init__(self):
        print('initializing Widget {}'.format(self.id))
        self.id = Widget.id
        Widget.id += 1

    def __str__(self):
        return str(self.id)

    def __repr__(self):
        return str(self)

def main():

    widgets = [Widget() for _ in range(16)]
    result = frobnicate_parallel(widgets)
    print(result.id)


if __name__ == '__main__':
    main()