Подтвердить что ты не робот

Реализация многопроцессорной обработки .Queue и queue.Queue

Я ищу дополнительные сведения о реализациях очередей в Python, чем я могу найти в документации.

Из того, что я понял, и извините мое невежество, если я ошибаюсь в этом:

queue.Queue(): реализуется с помощью базовых массивов в памяти и поэтому не может использоваться совместно несколькими процессами, но может использоваться совместно между потоками. Пока что так хорошо.

multiprocessing.Queue(): реализуется через трубы (man 2 pipes), которые имеют ограничение по размеру (довольно крошечное: в Linux, man 7 pipe говорит 65536 невербально):

Начиная с Linux 2.6.35, емкость по умолчанию составляет 65536 байт, но емкость может быть запрошена и задана с помощью операций fcntl(2) F_GETPIPE_SZ и F_SETPIPE_SZ

Но, в Python, всякий раз, когда я пытаюсь записать в канал данные размером более 65536 байтов, он работает без исключения - я мог бы наводнить мою память следующим образом:

import multiprocessing
from time import sleep

def big():
    result = ""
    for i in range(1,70000):
        result += ","+str(i)
    return result # 408888 bytes string

def writequeue(q):
    while True:
        q.put(big())
        sleep(0.1)

if __name__ == '__main__':
    q = multiprocessing.Queue()
    p = multiprocessing.Process(target=writequeue, args=(q,))
    p.start()
    while True:
        sleep(1) # No pipe consumption, we just want to flood the pipe

Итак, вот мои вопросы:

  • Python настраивает ограничение на канал? если да, то насколько? Исходный код Python приветствуется.

  • Связываются ли Python сообщения, взаимодействующие с другими процессами, отличными от Python? Если да, приветствуются рабочие примеры (желательно JS) и ссылки ресурсов.

4b9b3361

Ответ 1

Почему q.put() не блокирует?

mutiprocessing.Queue создает канал, который блокирует, если труба уже заполнена. Разумеется, запись больше, чем пропускная способность канала вызовет вызов write, пока считывающее устройство не очистит достаточное количество данных. Итак, если труба блокируется при достижении ее пропускной способности, почему q.put() не блокируется, когда труба заполнена? Даже первый вызов q.put() в этом примере должен заполнить трубку, и все должно блокироваться там, нет?

Нет, он не блокируется, , потому что реализация multiprocessing.Queue отделяет метод .put() от записи в канал. Метод .put() помещает данные, переданные ему во внутреннем буфере, и есть отдельный поток, который заряжается чтением из этого буфера и записывается в трубу. Этот поток будет блокироваться, когда труба будет заполнена, но это не помешает .put() откопать больше данных во внутренний буфер.

Реализация . put() сохраняет данные в self._buffer и замечает, как он запускает поток, если его еще нет работает:

def put(self, obj, block=True, timeout=None):
    assert not self._closed
    if not self._sem.acquire(block, timeout):
        raise Full

    with self._notempty:
        if self._thread is None:
            self._start_thread()
        self._buffer.append(obj)
        self._notempty.notify()

Метод ._feed() - это то, что читается из self._buffer и передает данные в канал. И ._start_thread() - это то, что устанавливает поток, который запускает ._feed().

Как ограничить размер очереди?

Если вы хотите ограничить, сколько данных может быть записано в очередь, я не вижу способа сделать это, указав количество байтов, но вы можете ограничить количество элементов, которые хранятся во внутреннем буфере, в любой раз, передав число до multiprocessing.Queue:

q = multiprocessing.Queue(2)

Когда я использую указанный выше параметр и использую ваш код, q.put() будет выставлять в очередь два элемента и будет блокироваться при третьей попытке.

Являются ли связи с Python-каналами взаимозависимыми с другими процессами, отличными от Python?

Это зависит. Средства, предоставляемые модулем multiprocessing, нелегко взаимодействуют с другими языками. Я ожидаю, что можно будет multiprocessing взаимодействовать с другими языками, но достижение этой цели было бы крупным предприятием. Модуль написан с ожиданием, что задействованные процессы запускают код Python.

Если вы посмотрите на более общие методы, тогда ответ будет да. Вы можете использовать сокет как коммуникационный канал между двумя разными процессами. Например, процесс JavaScript, который читается из именованного сокета:

var net = require("net");
var fs = require("fs");

sockPath = "/tmp/test.sock"
try {
    fs.unlinkSync(sockPath);
}
catch (ex) {
    // Don't care if the path does not exist, but rethrow if we get
    // another error.
    if (ex.code !== "ENOENT") {
        throw ex;
    }
}

var server = net.createServer(function(stream) {
  stream.on("data", function(c) {
    console.log("received:", c.toString());
  });

  stream.on("end", function() {
    server.close();
  });
});

server.listen(sockPath);

И процесс Python, который пишет ему:

import socket
import time

sockfile = "/tmp/test.sock"

conn = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
conn.connect(sockfile)

count = 0
while True:
    count += 1
    conn.sendall(bytes(str(count), "utf-8"))
    time.sleep(1)

Если вы хотите попробовать это, сначала нужно запустить сторону JavaScript, чтобы на стороне Python было что-то писать. Это доказательство концепции. Для полного решения потребуется больше пользы.

Чтобы передать сложные структуры из Python на другие языки, вам нужно будет найти способ сериализации данных в формате, который можно прочитать с обеих сторон. Соленые огурцы, к сожалению, специфичны для Python. Я обычно выбираю JSON всякий раз, когда мне нужно сериализовать между языками или использовать специальный формат, если JSON этого не сделает.