Подтвердить что ты не робот

Потерянных сообщений на zeromq pub sub

Я пытаюсь реализовать шаблон паб-проекта, используя структуру zeromq. Идея состоит в том, чтобы запустить подписчика, а затем запустить издателя. Абонент будет слушать 100 сообщений, и издатель опубликует 100 сообщений. Все идет нормально... Однако на самом деле происходит то, что даже если абонент уже запущен и запущен при запуске издателя, не все сообщения принимаются подписчиком (100 сообщений будут подняты подписчиком, если издатель отправит не менее 500 сообщение). Похоже, что первые сообщения, отправленные издателем, не отправляются подписчику.

Любые идеи?

Спасибо заранее, Омер.

Код подписчика (запущен перед издателем)

int i=0;
zmq::context_t context (1);
zmq::socket_t subscriber (context, ZMQ_SUB);
subscriber.connect("tcp://localhost:5556");
subscriber.setsockopt(ZMQ_SUBSCRIBE, "", 0);

for (int update_nbr = 0; update_nbr < 100; update_nbr++) 
{        
    zmq::message_t update;
    subscriber.recv(&update);
    i++;
    std::cout<<"receiving  :"<<i<<std::endl;
}

Код издателя (запущенный после подписчика)

zmq::context_t context (1);
zmq::socket_t publisher (context, ZMQ_PUB);
publisher.bind("tcp://*:5556");

int i = 0;
for (int update_nbr = 0; update_nbr < 100; update_nbr++) 
{        
    //  Send message to all subscribers
    zmq::message_t request (20);

    time_t seconds;
    seconds = time (NULL);

    char update [20]="";
    sprintf (update, "%ld", seconds);

    memcpy ((void *) request.data (), update,strlen(update));
    publisher.send(request);
    i++;
    std::cout << "sending :" << i << std::endl;

}

4b9b3361

Ответ 1

Смотрите http://zguide.zeromq.org/page:all#Missing-Message-Problem-Solver и найдите "медленный столяр" на этой веб-странице.

В принципе, для установления соединения требуется немного времени (несколько миллисекунд), и за это время может быть потеряно много сообщений. Издателю нужно немного поспать, прежде чем начать публиковать, или (лучше) ему нужно явно синхронизировать с подписчиком.

Ответ 2

В 0MQ успешная функция send() не означает, что данные отправляются сразу по сети. http://api.zeromq.org/2-1:zmq-send. Ваши сообщения довольно малы, и AFAIR 0MQ делает некоторую буферизацию для небольших сообщений, чтобы использовать сеть более эффективно.

Если я правильно помню, out_batch_size в config.hpp 0MQ контролирует такое поведение.

Ответ 3

Одна вещь, на которую нужно обратить внимание (помимо того, что отмечали предыдущие комментаторы), - это ваша процедура выключения.

Фрагменты кода могут быть просто неполными, но я не вижу, как вы обрабатываете завершение работы. В частности, вы можете потерять последние отправленные сообщения. Взгляните на документацию для zmq_close, zmq_term и ZMQ_LINGER. Если вы фактически не вызываете эти функции и вместо этого просто завершаете приложение, тогда есть вероятность, что сообщения, которые были отправлены с помощью zmq_send(), но не были переданы в сеть, потерянный при завершении работы.

Чтобы проверить, какие сообщения будут потеряны, вы можете попытаться отправить порядковый номер в дополнение к отметке времени.

Ответ 4

Посмотрите guide.

  • издатель отправляет "привет"
  • каждый абонент, получающий "привет", отправляет сообщение издателю через разъем REQ/REP
  • Когда издатель получает достаточное сообщение REQ/REP, он начинает публиковать данные