Подтвердить что ты не робот

Python 3.4 многопроцессорная очередь быстрее, чем Pipe, неожиданно

Я делаю аудиоплеер, который получил образцы из сокета udp, и все работает нормально. Но когда я реализовал алгоритм Lost Conscalment, игроку не удалось продлить молчание по исключенному курсу (каждые 10 мс отправляют список из 160 байтов).

При воспроизведении звука с помощью pyaudio, используя запись блокирующего вызова, чтобы воспроизвести некоторые образцы, я заметил, что он заблокирован в среднем по длительности образца. Поэтому я создал новый выделенный процесс для воспроизведения образцов.

Основной процесс обрабатывает выходной поток аудио и отправляет результат этому процессу с использованием многопроцессорной обработки. Я решил использовать multprocessing.Pipe, потому что он должен был быть быстрее, чем другие способы.

К сожалению, когда я запускал программу на виртуальной машине, битрейт был половиной того, что я получал на своем быстром ПК, что не соответствовало целевому битрейту.

После некоторых тестов я пришел к выводу, что причиной задержки является функция Pipe send.

Я сделал простой тест script (см. ниже), чтобы увидеть различия между различными способами передачи в процесс. script, постоянно отправляет [b'\x00'*160] в течение 5 секунд и подсчитывает, сколько байтов байтового объекта было отправлено в общей сложности. Я протестировал следующие методы отправки: "не отправлять", многопроцессорность. Протокол, многопроцессорность. Queue, multiprocessing.Manager, multiprocessing.Listener/Client и, наконец, socket.socket:

Результаты для моего "быстрого" рабочего стола ПК 7 x64:

test_empty     :     1516076640
test_pipe      :       58155840
test_queue     :      233946880
test_manager   :        2853440
test_socket    :       55696160
test_named_pipe:       58363040

Результаты для гостевой системы VirtualBox VM, работающей под управлением Windows 7 x64, хост с Windows 7 x64:

test_empty     :     1462706080
test_pipe      :       32444160
test_queue     :      204845600
test_manager   :         882560
test_socket    :       20549280
test_named_pipe:       35387840  

Script используется:

from multiprocessing import Process, Pipe, Queue, Manager
from multiprocessing.connection import Client, Listener
import time

FS = "{:<15}:{:>15}"


def test_empty():
    s = time.time()
    sent = 0
    while True:
        data = b'\x00'*160
        lst = [data]

        sent += len(data)
        if time.time()-s >= 5:
            break
    print(FS.format("test_empty", sent))


def pipe_void(pipe_in):
    while True:
        msg = pipe_in.recv()
        if msg == []:
            break


def test_pipe():
    pipe_out, pipe_in = Pipe()
    p = Process(target=pipe_void, args=(pipe_in,))
    p.start()
    s = time.time()
    sent = 0
    while True:
        data = b'\x00'*160
        lst = [data]
        pipe_out.send(lst)
        sent += len(data)
        if time.time()-s >= 5:
            break
    pipe_out.send([])
    p.join()
    print(FS.format("test_pipe", sent))


def queue_void(q):
    while True:
        msg = q.get()
        if msg == []:
            break


def test_queue():
    q = Queue()
    p = Process(target=queue_void, args=(q,))
    p.start()
    s = time.time()
    sent = 0
    while True:
        data = b'\x00'*160
        lst = [data]
        q.put(lst)
        sent += len(data)
        if time.time()-s >= 5:
            break
    q.put([])
    p.join()

    print(FS.format("test_queue", sent))


def manager_void(l, lock):
    msg = None
    while True:
        with lock:
            if len(l) > 0:
                msg = l.pop(0)
        if msg == []:
            break


def test_manager():
    with Manager() as manager:
        l = manager.list()
        lock = manager.Lock()
        p = Process(target=manager_void, args=(l, lock))
        p.start()
        s = time.time()
        sent = 0
        while True:
            data = b'\x00'*160
            lst = [data]
            with lock:
                l.append(lst)
            sent += len(data)
            if time.time()-s >= 5:
                break
        with lock:
            l.append([])
        p.join()

        print(FS.format("test_manager", sent))


def socket_void():
    addr = ('127.0.0.1', 20000)
    conn = Client(addr)
    while True:
        msg = conn.recv()
        if msg == []:
            break


def test_socket():
    addr = ('127.0.0.1', 20000)
    listener = Listener(addr, "AF_INET")
    p = Process(target=socket_void)
    p.start()
    conn = listener.accept()
    s = time.time()
    sent = 0
    while True:
        data = b'\x00'*160
        lst = [data]
        conn.send(lst)
        sent += len(data)
        if time.time()-s >= 5:
            break
    conn.send([])
    p.join()

    print(FS.format("test_socket", sent))


def named_pipe_void():
    addr = '\\\\.\\pipe\\Test'
    conn = Client(addr)
    while True:
        msg = conn.recv()
        if msg == []:
            break


def test_named_pipe():
    addr = '\\\\.\\pipe\\Test'
    listener = Listener(addr, "AF_PIPE")
    p = Process(target=named_pipe_void)
    p.start()
    conn = listener.accept()
    s = time.time()
    sent = 0
    while True:
        data = b'\x00'*160
        lst = [data]
        conn.send(lst)
        sent += len(data)
        if time.time()-s >= 5:
            break
    conn.send([])
    p.join()

    print(FS.format("test_named_pipe", sent))


if __name__ == "__main__":
    test_empty()
    test_pipe()
    test_queue()
    test_manager()
    test_socket()
    test_named_pipe()

Вопрос

  • Если Queue использует Pipe, как это происходит быстрее, чем Pipe в этом контексте? Это противоречит вопросу Многопроцессорность Python - Pipe vs Queue
  • Как я могу гарантировать постоянный поток битрейта из процесса в другой, имея небольшую задержку отправки?

Обновление 1

Внутри моей программы, после тестирования с очередями вместо Pipes. Я получил огромный импульс.

На моем компьютере, используя Pipes, я получил + - 16000 B/s, используя Queues я получил + -7,5 миллионов B/s. На виртуальной машине я получил от -13000 B/с до 6,5 миллионов B/с. Это примерно в 500 раз больше байтов, используя Queue instread of Pipe.

Конечно, я не буду играть миллионы байт в секунду, я буду играть только обычную скорость звука. (в моем случае 16000 B/s, совпадение со значением выше).
Но дело в том, что я могу ограничить скорость до того, что хочу, но еще успеваю закончить другие вычисления (например, получать из сокетов, применяя звуковые алгоритмы и т.д.).

4b9b3361

Ответ 1

Я не могу сказать точно, но я думаю, что проблема, с которой вы имеете дело, - синхронный или асинхронный ввод-вывод. Я предполагаю, что труба каким-то образом заканчивается синхронно, и очередь заканчивается асинхронной. Почему именно один по умолчанию имеет один способ, а другой - другой, на который может ответить этот вопрос и ответ:

Синхронное/асинхронное поведение Python-каналов