Подтвердить что ты не робот

Использование ZeroMQ вместе с Boost:: ASIO

У меня есть приложение на С++, которое использует ZeroMQ для некоторых сообщений. Но он также должен обеспечить соединение SGCI для веб-службы на основе AJAX/Comet.

Для этого мне нужен обычный TCP-сокет. Я мог бы сделать это с помощью обычных гнезд Posix, но оставайтесь на переносной платформе и упростите мою жизнь (надеюсь...) Я думал об использовании Boost:: ASIO.

Но теперь у меня есть столкновение ZMQ, которое хочет использовать его собственный zmq_poll() и ASIO it io_service.run()...

Есть ли способ заставить ASIO работать вместе с 0MQ zmq_poll()?

Или есть другой рекомендуемый способ для достижения такой настройки?

Примечание. Я мог бы решить это, используя несколько потоков - но это всего лишь небольшой ядро ​​/процессорный ящик, который будет запускать эту программу с очень низким количеством трафика SCGI, поэтому многопоточность будет пустой тратой ресурсов...

4b9b3361

Ответ 1

После прочтения документации здесь и здесь, в частности, этот пункт

ZMQ_FD: получить файловый дескриптор, связанный с сокетом. ZMQ_FD опция должна получить файловый дескриптор, связанный с указанный сокет. Дескриптор возвращаемого файла можно использовать для интегрировать сокет в существующий цикл событий; библиотека ØMQ должен сигнализировать о любых ожидающих событиях на сокете в срабатывании с краем путем создания файлового дескриптора для чтения.

Я думаю, вы можете использовать null_buffers для каждого zmq_pollitem_t и отложить цикл событий до io_service, полностью обходя zmq_poll() вообще. Однако в вышеупомянутой документации есть некоторые предостережения, особенно

Возможность чтения из дескриптора возвращенного файла не обязательно указывать, что сообщения доступны для чтения или может быть записано в базовый сокет; приложения должны извлекать фактическое состояние события с последующим извлечением ZMQ_EVENTS вариант.

Итак, когда обработчик одного из ваших гнезд zmq запущен, вам нужно будет сделать немного больше работы, прежде чем обрабатывать событие, которое, как я думаю. Некомпилированный псевдокод ниже

const int fd = getZmqDescriptorSomehow();
boost::asio::posix::stream_descriptor socket( _io_service, fd );
socket->async_read_some(
    boost::asio::null_buffers(),
    [=](const boost::system::error_code& error)
    {
       if (!error) {
           // handle data ready to be read
       }
     }
);

обратите внимание, что вам не нужно использовать лямбда здесь, boost::bind для функции-члена будет достаточно.

Ответ 2

В конце концов я понял, что есть два возможных решения:

  • Сэм Миллер, где мы используем цикл событий ASIO
  • Цикл событий ZeroMQ, получая дескрипторы файла ASIO, хотя методы .native() acceptor и socket и вставляя их в массив zmq_pollitem_t

Я согласился с ответом Сэма Миллера как на то, что для меня лучшее решение в случае SCGI, где постоянно создаются и заканчиваются постоянно новые соединения. Обработка каждого изменяющегося массива zmq_pollitem_t - это большая проблема, которой можно избежать с помощью цикла событий ASIO.

Ответ 3

Получение сокета для ZeroMQ - самая маленькая часть битвы. ZeroMQ основан на протоколе который накладывается поверх TCP, поэтому вам придется повторно реализовать ZeroMQ в пользовательском Boost.Asio io_service, если вы идете по этому маршруту, Я столкнулся с той же проблемой при создании асинхронной службы ENet с использованием Boost.Asio, сначала попробовав поймать трафик от клиента ENet, используя Boost.Asio UDP. ENet - это протокол TCP, наложенный на UDP, поэтому все, что я достиг в этот момент, - это захват пакетов в практически бесполезном состоянии.

Boost.Asio основан на шаблонах, а встроенные шаблоны io_service используют для обертывания библиотеки системных сокетов для создания служб TCP и UDP. Моим окончательным решением было создать пользовательский io_service, который обернул библиотеку ENet, а не библиотеку системных сокетов, позволяя ей использовать транспортные функции ENet, вместо того, чтобы переопределять их с помощью встроенного транспорта UDP.

То же самое можно сделать для ZeroMQ, но ZeroMQ уже является очень высокопроизводительной сетевой библиотекой, которая сама по себе уже предоставляет асинхронный ввод-вывод. Я думаю, вы можете создать жизнеспособное решение, получая сообщения с использованием существующего API ZeroMQ и передавая сообщения в пул потоков io_service. Таким образом, сообщения/задачи будут по-прежнему обрабатываться асинхронно с использованием диаграммы Boost.Asio без необходимости перезаписывать что-либо. ZeroMQ предоставит асинхронный ввод-вывод, Boost.Asio предоставит обработчики/рабочие задачи async.

Существующий io_service все еще может быть подключен к существующему сокету TCP, позволяя threadpool обрабатывать как TCP (HTTP в вашем случае), так и ZeroMQ. Это вполне возможно в такой настройке для обработчиков задач ZeroMQ для доступа к объектам сеанса служб TCP, что позволяет отправлять результаты сообщения/задачи ZeroMQ обратно на клиент TCP.

Ниже приведена только иллюстрация концепции.

// Create a pool of threads to run all of the io_services.
std::vector<boost::shared_ptr<boost::thread> > threads;
for(std::size_t i = 0; i < thread_pool_size_; ++i) {
    boost::shared_ptr<boost::thread> thread(new boost::thread(boost::bind(&boost::asio::io_service::run, &io_service_)));
    threads.push_back(thread);
}

while (1) {
    char buffer [10];
    zmq_recv (responder_, buffer, 10, 0);
    io_service_.post(boost::bind(&server::handle_zeromq_message, buffer, this));
}

Ответ 4

Решением является также опрос вашего io_service, а не run().

Просмотрите это решение для некоторого poll() info.

Использование опроса вместо запуска позволит вам опросить zmq-соединения без каких-либо проблем с блокировкой.