Подтвердить что ты не робот

Соединитель ZeroMQ PUB буферизует все мои исходящие данные, когда он подключается

Я заметил, что сокет zeromq PUB будет буферизировать все исходящие данные, если он подключается, например

import zmq
import time
context = zmq.Context()

# create a PUB socket
pub = context.socket (zmq.PUB)
pub.connect("tcp://127.0.0.1:5566")
# push some message before connected
# they should be dropped
for i in range(5):
    pub.send('a message should not be dropped')

time.sleep(1)

# create a SUB socket
sub = context.socket (zmq.SUB)
sub.bind("tcp://127.0.0.1:5566")
sub.setsockopt(zmq.SUBSCRIBE, "")

time.sleep(1)

# this is the only message we should see in SUB
pub.send('hi')

while True:
    print sub.recv()

Подвязки после этих сообщений, они должны быть отброшены, потому что PUB должен отбрасывать сообщения, если с ним никто не подключался. Но вместо того, чтобы отбрасывать сообщения, он буферизует все сообщения.

a message should not be dropped
a message should not be dropped
a message should not be dropped
a message should not be dropped
a message should not be dropped
hi

Как вы можете видеть, те, что "сообщение не следует отбрасывать", буферизуются сокетом, как только он подключается, он выдает их в SUB-сокет. Если я привяжусь к сокету PUB и подключусь к гнезду SUB, он будет работать правильно.

import zmq
import time
context = zmq.Context()

# create a PUB socket
pub = context.socket (zmq.PUB)
pub.bind("tcp://127.0.0.1:5566")
# push some message before connected
# they should be dropped
for i in range(5):
    pub.send('a message should not be dropped')

time.sleep(1)

# create a SUB socket
sub = context.socket (zmq.SUB)
sub.connect("tcp://127.0.0.1:5566")
sub.setsockopt(zmq.SUBSCRIBE, "")

time.sleep(1)

# this is the only message we should see in SUB
pub.send('hi')

while True:
    print repr(sub.recv())

И вы можете видеть только вывод

'hi'

Такое странное поведение вызывает проблему, оно буферизует все данные в соединительном сокете, у меня есть два сервера, сервер A публикует данные на сервер B

Server A -- publish --> Server B

Он отлично работает, если сервер B подключается к сети. Но что, если я запустил сервер A и не запустил сервер B?

В результате соединительный разъем PUB на сервере A сохраняет все эти данные, использование памяти становится все выше и выше.

Вот проблема, является ли это поведение ошибкой или функцией? Если это функция, где я могу найти документ, который упоминает это поведение? И как я могу остановить подключаемый сокет PUB для всех данных?

Спасибо.

4b9b3361

Ответ 1

Является ли сокет блокирует или отбрасывает сообщения, зависит от типа сокета, как описано в Документация ZMQ:: Socket (акцент ниже мой):

ZMQ:: HWM: получить отметку о высокой воде

Опция ZMQ:: HWM должна получить знак высокой воды для указанный сокет. Знак высокой воды является жестким пределом максимума количество выдающихся сообщений 0MQ должно стоять в очереди в памяти для любого одиночный одноранговый узел, с которым связан указанный сокет.

Если этот предел достигнут, гнездо должно ввести исключительный и в зависимости от типа сокета, 0MQ должен принять соответствующие действие, такое как блокирование или удаление отправленных сообщений. Обратитесь к отдельные описания гнезд в ZMQ:: Socket для получения подробной информации о точном действие для каждого типа сокета.

Значение ZMQ:: HWM по умолчанию, равное нулю, означает "без ограничений".

Вы можете увидеть, будет ли он блокироваться или пропадать, просмотрев документацию для типа сокета для ZMQ::HWM option action, которая будет либо Block, либо Drop.

Действие для ZMQ::PUB равно Drop, поэтому, если оно не падает, вы должны проверить значение HWM (High Water Mark) и прислушаться к предупреждению о том, что Значение ZMQ:: HWM по умолчанию, равное нулю, означает "no limit", означает, что он не войдет в исключительное состояние до тех пор, пока система не исчерпает память (в какой момент я не знаю, как она себя ведет).

Ответ 2

Я считаю, что это поведение является семантикой zmq_connect(). То есть: , когда zmq_connect() возвращает успех, тогда соединение концептуально установлено, и, таким образом, ваше соединение-PUB запускает сообщение о очередности, вместо того чтобы отбрасывать.

Ниже приведена отрывок из ZMQ Guide":

В теории с сокетами ØMQ не имеет значения, какой конец соединяется, и который заканчивается. Однако с гнездами PUB-SUB, если вы связываете SUB сокет и подключить разъем PUB, гнездо SUB может принимать старые сообщения, то есть сообщения, отправленные до запуска SUB. Это артефакт пути bind/connect работает. Лучше всего связать PUB и подключите SUB, если сможете.

Следующий раздел в zmq_connect() содержит некоторые подсказки, показанные ниже:

Основные отличия от обычных сокетов

Вообще говоря, обычные сокеты представляют собой синхронные интерфейс для ориентированных на соединение надежных байтовых потоков (SOCK_STREAM) или ненадежных дейтаграмм без подключения (SOCK_DGRAM). Для сравнения, сокеты ØMQ представляют собой абстракцию асинхронного очереди сообщений с точной семантикой очередей в зависимости от тип гнезда в использовании. Когда обычные сокеты передают потоки байтов или дискретных дейтаграмм, сокеты ØMQ передают дискретные сообщения.

Соединители ØMQ, являющиеся асинхронными, означают, что тайминги физического настройка соединения и срыв, повторное подключение и эффективная доставка прозрачный для пользователя и организованный самим ØMQ. Кроме того, сообщения может быть поставлена ​​в очередь в случае, если одноранговое соединение недоступно для их получения.

Ответ 3

Они устанавливают опцию HWM в сокете.

Ответ 4

Итак, bind() и connect() приводят к двум различным типам поведения. Почему бы вам не выбрать, какой из них вы предпочитаете (похоже, bind()) и использовать это?

Это действительно функция ZeroMQ в целом, что она буферизует исходящие сообщения до тех пор, пока не будет установлено соединение.

Ответ 5

Вы должны иметь возможность установить знак высокой воды в гнезде, используя hwm, устанавливающий паб-паз. Он позволяет определить, сколько сообщений сохраняется.

Ответ 6

Вот хак, который может помочь...

Установите ZMQ::HWM на фиксированное число, скажем 10. После подключения вызовите метод socket recv в цикле, пока он не сбросит все буферизованные сообщения, и только THEN начнет ваш основной цикл приема.