Подтвердить что ты не робот

Использование многопроцессорных труб python

Я пытаюсь написать класс, который будет вычислять контрольные суммы с использованием нескольких процессов, тем самым используя несколько ядер. У меня для этого довольно простой класс, и он отлично работает при выполнении простого случая. Но всякий раз, когда я создаю два или более экземпляра класса, рабочий никогда не выходит. Похоже, он никогда не получает сообщение о том, что труба была закрыта родителем.

Весь код можно найти ниже. Сначала я вычисляю контрольные суммы md5 и sha1, которые работают, а затем я пытаюсь выполнить вычисление параллельно, а затем программа блокируется, когда пришло время закрыть канал.

Что здесь происходит? Почему трубы не работают, как я ожидаю? Наверное, я мог бы сделать обходной путь, отправив сообщение "Остановить" в очереди и заставив ребенка выйти из этого способа, но мне очень хотелось бы знать, почему это не работает так, как есть.

import multiprocessing
import hashlib

class ChecksumPipe(multiprocessing.Process):
    def __init__(self, csname):
        multiprocessing.Process.__init__(self, name = csname)
        self.summer = eval("hashlib.%s()" % csname)
        self.child_conn, self.parent_conn = multiprocessing.Pipe(duplex = False)
        self.result_queue = multiprocessing.Queue(1)
        self.daemon = True
        self.start()
        self.child_conn.close() # This is the parent. Close the unused end.

    def run(self):
        self.parent_conn.close() # This is the child. Close unused end.
        while True:
            try:
                print "Waiting for more data...", self
                block = self.child_conn.recv_bytes()
                print "Got some data...", self
            except EOFError:
                print "Finished work", self
                break
            self.summer.update(block)
        self.result_queue.put(self.summer.hexdigest())
        self.result_queue.close()
        self.child_conn.close()

    def update(self, block):
        self.parent_conn.send_bytes(block)

    def hexdigest(self):
        self.parent_conn.close()
        return self.result_queue.get()


def main():
    # Calculating the first checksum works
    md5 = ChecksumPipe("md5")
    md5.update("hello")
    print "md5 is", md5.hexdigest()

    # Calculating the second checksum works
    sha1 = ChecksumPipe("sha1")
    sha1.update("hello")
    print "sha1 is", sha1.hexdigest()

    # Calculating both checksums in parallel causes a lockup!
    md5, sha1 = ChecksumPipe("md5"), ChecksumPipe("sha1")
    md5.update("hello")
    sha1.update("hello")
    print "md5 and sha1 is", md5.hexdigest(), sha1.hexdigest() # Lockup here!

main()

PS. Эта проблема решена. Вот рабочая версия вышеуказанного кода, если кому-то интересно:

import multiprocessing
import hashlib

class ChecksumPipe(multiprocessing.Process):

    all_open_parent_conns = []

    def __init__(self, csname):
        multiprocessing.Process.__init__(self, name = csname)
        self.summer = eval("hashlib.%s()" % csname)
        self.child_conn, self.parent_conn = multiprocessing.Pipe(duplex = False)
        ChecksumPipe.all_open_parent_conns.append(self.parent_conn)
        self.result_queue = multiprocessing.Queue(1)
        self.daemon = True
        self.start()
        self.child_conn.close() # This is the parent. Close the unused end.

    def run(self):
        for conn in ChecksumPipe.all_open_parent_conns:
            conn.close() # This is the child. Close unused ends.
        while True:
            try:
                print "Waiting for more data...", self
                block = self.child_conn.recv_bytes()
                print "Got some data...", self
            except EOFError:
                print "Finished work", self
                break
            self.summer.update(block)
        self.result_queue.put(self.summer.hexdigest())
        self.result_queue.close()
        self.child_conn.close()

    def update(self, block):
        self.parent_conn.send_bytes(block)

    def hexdigest(self):
        self.parent_conn.close()
        return self.result_queue.get()

def main():
    # Calculating the first checksum works
    md5 = ChecksumPipe("md5")
    md5.update("hello")
    print "md5 is", md5.hexdigest()

    # Calculating the second checksum works
    sha1 = ChecksumPipe("sha1")
    sha1.update("hello")
    print "sha1 is", sha1.hexdigest()

    # Calculating both checksums also works fine now
    md5, sha1 = ChecksumPipe("md5"), ChecksumPipe("sha1")
    md5.update("hello")
    sha1.update("hello")
    print "md5 and sha1 is", md5.hexdigest(), sha1.hexdigest()

main()
4b9b3361

Ответ 1

Да, это действительно удивительное поведение.

Однако, если вы посмотрите на вывод lsof для двух параллельных дочерних процессов, то легко заметить, что во втором дочернем процессе открыто больше дескрипторов файлов.

Что происходит, когда при запуске двух параллельных дочерних процессов второй ребенок наследует трубы родителя, так что, когда родитель вызывает self.parent_conn.close(), второй ребенок все еще имеет этот дескриптор файла pipe, так что описание файла канала не закрывается в ядре (количество ссылок больше 0), при этом эффект self.child_conn.recv_bytes() в первом параллельном дочернем процессе никогда read() EOF и EOFError никогда не бросается.

Вам может потребоваться отправить явное сообщение о закрытии, а не просто закрыть дескрипторы файлов, потому что, похоже, мало кто контролирует, какие файловые дескрипторы получают общий доступ между этими процессами (отсутствует флаг дескриптора файла close-on-fork).