Подтвердить что ты не робот

Как подключить отладчик к подпроцессу python?

Мне нужно отладить дочерний процесс, порожденный multiprocessing.Process(). Дегуатор pdb, похоже, не знает о разветвлении и не может подключиться к уже запущенным процессам.

Есть ли более умные отладчики python, которые можно привязать к подпроцессу?

4b9b3361

Ответ 1

Winpdb - это в значительной степени определение более умного отладчика Python. Он явно поддерживает идущий вниз по вилке, не уверен, что он отлично работает с multiprocessing.Process(), но стоит попробовать.

Список кандидатов для проверки вашего варианта использования см. в списке Python Debuggers в вики.

Ответ 2

Я искал простое решение этой проблемы и придумал следующее:

import sys
import pdb

class ForkedPdb(pdb.Pdb):
    """A Pdb subclass that may be used
    from a forked multiprocessing child

    """
    def interaction(self, *args, **kwargs):
        _stdin = sys.stdin
        try:
            sys.stdin = open('/dev/stdin')
            pdb.Pdb.interaction(self, *args, **kwargs)
        finally:
            sys.stdin = _stdin

Используйте его так же, как вы можете использовать классический Pdb:

ForkedPdb().set_trace()

Ответ 3

Это разработка ответа Romuald, который восстанавливает исходный stdin, используя его файловый дескриптор. Это приводит к тому, что readline работает внутри отладчика. Кроме того, специальное управление KeyboardInterrupt pdb отключено, чтобы не мешать многопроцессорному обработчику синтаксиса.

class ForkablePdb(pdb.Pdb):

    _original_stdin_fd = sys.stdin.fileno()
    _original_stdin = None

    def __init__(self):
        pdb.Pdb.__init__(self, nosigint=True)

    def _cmdloop(self):
        current_stdin = sys.stdin
        try:
            if not self._original_stdin:
                self._original_stdin = os.fdopen(self._original_stdin_fd)
            sys.stdin = self._original_stdin
            self.cmdloop()
        finally:
            sys.stdin = current_stdin

Ответ 4

Основываясь на идее @memplex, мне пришлось изменить ее, чтобы она работала с joblib, установив sys.stdin в конструкторе, а также передав его напрямую через joblib.

import os
import pdb
import signal
import sys
import joblib

_original_stdin_fd = None

class ForkablePdb(pdb.Pdb):
    _original_stdin = None
    _original_pid = os.getpid()

    def __init__(self):
        pdb.Pdb.__init__(self)
        if self._original_pid != os.getpid():
            if _original_stdin_fd is None:
                raise Exception("Must set ForkablePdb._original_stdin_fd to stdin fileno")

            self.current_stdin = sys.stdin
            if not self._original_stdin:
                self._original_stdin = os.fdopen(_original_stdin_fd)
            sys.stdin = self._original_stdin

    def _cmdloop(self):
        try:
            self.cmdloop()
        finally:
            sys.stdin = self.current_stdin

def handle_pdb(sig, frame):
    ForkablePdb().set_trace(frame)

def test(i, fileno):
    global _original_stdin_fd
    _original_stdin_fd = fileno
    while True:
        pass    

if __name__ == '__main__':
    print "PID: %d" % os.getpid()
    signal.signal(signal.SIGUSR2, handle_pdb)
    ForkablePdb().set_trace()
    fileno = sys.stdin.fileno()
    joblib.Parallel(n_jobs=2)(joblib.delayed(test)(i, fileno) for i in range(10))

Ответ 5

У меня была идея создать "фиктивные" классы, чтобы имитировать реализацию методов, которые вы используете из многопроцессорной обработки:

from multiprocessing import Pool


class DummyPool():
    @staticmethod
    def apply_async(func, args, kwds):
        return DummyApplyResult(func(*args, **kwds))

    def close(self): pass
    def join(self): pass


class DummyApplyResult():
    def __init__(self, result):
        self.result = result

    def get(self):
        return self.result


def foo(a, b, switch):
    # set trace when DummyPool is used
    # import ipdb; ipdb.set_trace()
    if switch:
        return b - a
    else:
        return a - b


if __name__ == '__main__':
    xml = etree.parse('C:/Users/anmendoza/Downloads/jim - 8.1/running-config.xml')
    pool = DummyPool()  # switch between Pool() and DummyPool() here
    results = []
    results.append(pool.apply_async(foo, args=(1, 100), kwds={'switch': True}))
    pool.close()
    pool.join()
    results[0].get()

Ответ 6

Если вы находитесь на поддерживаемой платформе, попробуйте DTrace. Большинство семейств BSD/Solaris/OS X поддерживают DTrace.

Вот вступление автора. Вы можете использовать Dtrace для отладки практически всего.

Вот сообщение SO об обучении DTrace.