Я делаю аудиоплеер, который получил образцы из сокета udp, и все работает нормально. Но когда я реализовал алгоритм Lost Conscalment, игроку не удалось продлить молчание по исключенному курсу (каждые 10 мс отправляют список из 160 байтов).
При воспроизведении звука с помощью pyaudio, используя запись блокирующего вызова, чтобы воспроизвести некоторые образцы, я заметил, что он заблокирован в среднем по длительности образца. Поэтому я создал новый выделенный процесс для воспроизведения образцов.
Основной процесс обрабатывает выходной поток аудио и отправляет результат этому процессу с использованием многопроцессорной обработки. Я решил использовать multprocessing.Pipe, потому что он должен был быть быстрее, чем другие способы.
К сожалению, когда я запускал программу на виртуальной машине, битрейт был половиной того, что я получал на своем быстром ПК, что не соответствовало целевому битрейту.
После некоторых тестов я пришел к выводу, что причиной задержки является функция Pipe send
.
Я сделал простой тест script (см. ниже), чтобы увидеть различия между различными способами передачи в процесс. script, постоянно отправляет [b'\x00'*160]
в течение 5 секунд и подсчитывает, сколько байтов байтового объекта было отправлено в общей сложности. Я протестировал следующие методы отправки: "не отправлять", многопроцессорность. Протокол, многопроцессорность. Queue, multiprocessing.Manager, multiprocessing.Listener/Client и, наконец, socket.socket:
Результаты для моего "быстрого" рабочего стола ПК 7 x64:
test_empty : 1516076640
test_pipe : 58155840
test_queue : 233946880
test_manager : 2853440
test_socket : 55696160
test_named_pipe: 58363040
Результаты для гостевой системы VirtualBox VM, работающей под управлением Windows 7 x64, хост с Windows 7 x64:
test_empty : 1462706080
test_pipe : 32444160
test_queue : 204845600
test_manager : 882560
test_socket : 20549280
test_named_pipe: 35387840
Script используется:
from multiprocessing import Process, Pipe, Queue, Manager
from multiprocessing.connection import Client, Listener
import time
FS = "{:<15}:{:>15}"
def test_empty():
s = time.time()
sent = 0
while True:
data = b'\x00'*160
lst = [data]
sent += len(data)
if time.time()-s >= 5:
break
print(FS.format("test_empty", sent))
def pipe_void(pipe_in):
while True:
msg = pipe_in.recv()
if msg == []:
break
def test_pipe():
pipe_out, pipe_in = Pipe()
p = Process(target=pipe_void, args=(pipe_in,))
p.start()
s = time.time()
sent = 0
while True:
data = b'\x00'*160
lst = [data]
pipe_out.send(lst)
sent += len(data)
if time.time()-s >= 5:
break
pipe_out.send([])
p.join()
print(FS.format("test_pipe", sent))
def queue_void(q):
while True:
msg = q.get()
if msg == []:
break
def test_queue():
q = Queue()
p = Process(target=queue_void, args=(q,))
p.start()
s = time.time()
sent = 0
while True:
data = b'\x00'*160
lst = [data]
q.put(lst)
sent += len(data)
if time.time()-s >= 5:
break
q.put([])
p.join()
print(FS.format("test_queue", sent))
def manager_void(l, lock):
msg = None
while True:
with lock:
if len(l) > 0:
msg = l.pop(0)
if msg == []:
break
def test_manager():
with Manager() as manager:
l = manager.list()
lock = manager.Lock()
p = Process(target=manager_void, args=(l, lock))
p.start()
s = time.time()
sent = 0
while True:
data = b'\x00'*160
lst = [data]
with lock:
l.append(lst)
sent += len(data)
if time.time()-s >= 5:
break
with lock:
l.append([])
p.join()
print(FS.format("test_manager", sent))
def socket_void():
addr = ('127.0.0.1', 20000)
conn = Client(addr)
while True:
msg = conn.recv()
if msg == []:
break
def test_socket():
addr = ('127.0.0.1', 20000)
listener = Listener(addr, "AF_INET")
p = Process(target=socket_void)
p.start()
conn = listener.accept()
s = time.time()
sent = 0
while True:
data = b'\x00'*160
lst = [data]
conn.send(lst)
sent += len(data)
if time.time()-s >= 5:
break
conn.send([])
p.join()
print(FS.format("test_socket", sent))
def named_pipe_void():
addr = '\\\\.\\pipe\\Test'
conn = Client(addr)
while True:
msg = conn.recv()
if msg == []:
break
def test_named_pipe():
addr = '\\\\.\\pipe\\Test'
listener = Listener(addr, "AF_PIPE")
p = Process(target=named_pipe_void)
p.start()
conn = listener.accept()
s = time.time()
sent = 0
while True:
data = b'\x00'*160
lst = [data]
conn.send(lst)
sent += len(data)
if time.time()-s >= 5:
break
conn.send([])
p.join()
print(FS.format("test_named_pipe", sent))
if __name__ == "__main__":
test_empty()
test_pipe()
test_queue()
test_manager()
test_socket()
test_named_pipe()
Вопрос
- Если Queue использует Pipe, как это происходит быстрее, чем Pipe в этом контексте? Это противоречит вопросу Многопроцессорность Python - Pipe vs Queue
- Как я могу гарантировать постоянный поток битрейта из процесса в другой, имея небольшую задержку отправки?
Обновление 1
Внутри моей программы, после тестирования с очередями вместо Pipes. Я получил огромный импульс.
На моем компьютере, используя Pipes, я получил + - 16000 B/s, используя Queues я получил + -7,5 миллионов B/s. На виртуальной машине я получил от -13000 B/с до 6,5 миллионов B/с. Это примерно в 500 раз больше байтов, используя Queue instread of Pipe.
Конечно, я не буду играть миллионы байт в секунду, я буду играть только обычную скорость звука. (в моем случае 16000 B/s, совпадение со значением выше).
Но дело в том, что я могу ограничить скорость до того, что хочу, но еще успеваю закончить другие вычисления (например, получать из сокетов, применяя звуковые алгоритмы и т.д.).