Подтвердить что ты не робот

Boost:: asio - как прервать заблокированный поток сервера tcp?

Я работаю над многопоточным приложением, в котором один поток действует как tcp-сервер, который получает команды от клиента. В потоке используется сокет Boost и акцептор для ожидания подключения клиента, получения команды от клиента, передачи команды остальной части приложения и последующего ожидания. Здесь код:

void ServerThreadFunc()
{
    using boost::asio::ip::tcp;
    boost::asio::io_service io_service;
    tcp::acceptor acceptor(io_service, tcp::endpoint(tcp::v4(), port_no));

    for (;;)
    {
        //  listen for command connection
        tcp::socket socket(io_service);
        acceptor.accept(socket);

        //  connected; receive command
        boost::array<char,256> msg_buf;
        socket.receive(boost::asio::buffer(msg_buf));

        //  do something with received bytes here
    }
}

Этот поток тратит большую часть своего времени на вызов acceptor.accept(). В настоящий момент поток прекращается только после выхода приложения. К сожалению, это приводит к сбою после возврата main() - я верю, потому что поток пытается получить доступ к протоколу однопользовательского входа приложения после того, как синглтон был уничтожен. (Это было так, когда я приехал сюда, честный гув.)

Как я могу закрыть этот поток до конца, когда придет время для выхода приложения? Я прочитал, что вызов блокировки accept() в сыром соке может быть прерван закрытием сокета из другого потока, но это не работает на разъем Boost. Я попытался преобразовать логику сервера в асинхронный ввод-вывод, используя Пример асинхронного tcp-эхо-сервера Boost, но это похоже на обмен блокирующим вызовом до acceptor::accept() для блокирующего вызова io_service::run(), поэтому я остаюсь с той же проблемой: заблокированный вызов, который я не могу прерывать. Любые идеи?

4b9b3361

Ответ 1

Короче говоря, есть два варианта:

  • Измените код на асинхронный (acceptor::async_accept() и async_read), запустите в цикле событий через io_service::run() и отмените его через io_service::stop().
  • Сила блокирует вызовы для прерывания с помощью механики более низкого уровня, например сигналов.

Я бы порекомендовал первый вариант, так как он скорее переносится и проще в обслуживании. Важно понимать, что io_service::run() блокируется только при ожидающей работе. Когда вызывается io_service::stop(), он попытается вызвать все потоки, заблокированные на io_service::run(), чтобы вернуться как можно быстрее; он не будет прерывать синхронные операции, такие как acceptor::accept() и socket::receive(), даже если синхронные операции вызывают в цикле событий. Важно отметить, что io_service::stop() является неблокирующим вызовом, поэтому синхронизация с потоками, которые были заблокированы на io_service::run(), должна использовать другой механизм, такой как thread::join().

Вот пример, который будет работать в течение 10 секунд и прослушивает порт 8080:

#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/thread.hpp>
#include <iostream>

void StartAccept( boost::asio::ip::tcp::acceptor& );

void ServerThreadFunc( boost::asio::io_service& io_service )
{
  using boost::asio::ip::tcp;
  tcp::acceptor acceptor( io_service, tcp::endpoint( tcp::v4(), 8080 ) );

  // Add a job to start accepting connections.
  StartAccept( acceptor );

  // Process event loop.
  io_service.run();

  std::cout << "Server thread exiting." << std::endl;
}

void HandleAccept( const boost::system::error_code& error,
                   boost::shared_ptr< boost::asio::ip::tcp::socket > socket,
                   boost::asio::ip::tcp::acceptor& acceptor )
{
  // If there was an error, then do not add any more jobs to the service.
  if ( error )
  {
    std::cout << "Error accepting connection: " << error.message() 
              << std::endl;
    return;
  }

  // Otherwise, the socket is good to use.
  std::cout << "Doing things with socket..." << std::endl;

  // Perform async operations on the socket.

  // Done using the socket, so start accepting another connection.  This
  // will add a job to the service, preventing io_service::run() from
  // returning.
  std::cout << "Done using socket, ready for another connection." 
            << std::endl;
  StartAccept( acceptor );
};

void StartAccept( boost::asio::ip::tcp::acceptor& acceptor )
{
  using boost::asio::ip::tcp;
  boost::shared_ptr< tcp::socket > socket(
                                new tcp::socket( acceptor.get_io_service() ) );

  // Add an accept call to the service.  This will prevent io_service::run()
  // from returning.
  std::cout << "Waiting on connection" << std::endl;
  acceptor.async_accept( *socket,
    boost::bind( HandleAccept,
      boost::asio::placeholders::error,
      socket,
      boost::ref( acceptor ) ) );
}

int main()
{
  using boost::asio::ip::tcp;

  // Create io service.
  boost::asio::io_service io_service;

  // Create server thread that will start accepting connections.
  boost::thread server_thread( ServerThreadFunc, boost::ref( io_service ) );

  // Sleep for 10 seconds, then shutdown the server.
  std::cout << "Stopping service in 10 seconds..." << std::endl;
  boost::this_thread::sleep( boost::posix_time::seconds( 10 ) );
  std::cout << "Stopping service now!" << std::endl;

  // Stopping the io_service is a non-blocking call.  The threads that are
  // blocked on io_service::run() will try to return as soon as possible, but
  // they may still be in the middle of a handler.  Thus, perform a join on 
  // the server thread to guarantee a block occurs.
  io_service.stop();

  std::cout << "Waiting on server thread..." << std::endl;
  server_thread.join();
  std::cout << "Done waiting on server thread." << std::endl;

  return 0;
}

Во время работы я открыл два подключения. Вот результат:

Stopping service in 10 seconds...
Waiting on connection
Doing things with socket...
Done using socket, ready for another connection.
Waiting on connection
Doing things with socket...
Done using socket, ready for another connection.
Waiting on connection
Stopping service now!
Waiting on server thread...
Server thread exiting.
Done waiting on server thread.

Ответ 2

Когда вы получаете событие, на которое время для выхода, вы можете вызвать acceptor.cancel(), который отменит ожидающий прием (с кодом ошибки operation_canceled). В некоторых системах вам также может потребоваться close() акцептор, чтобы быть в безопасности.

Ответ 3

Если дело доходит до него, вы можете открыть временное клиентское соединение с ним на localhost - это вызовет его. Вы даже можете отправить ему специальное сообщение, чтобы вы могли закрыть свой сервер из паба - для этого должно быть приложение:)

Ответ 4

Просто закройте выключение с помощью встроенного дескриптора и параметра SHUT_RD, чтобы отменить существующую операцию приема (принятия).