Подтвердить что ты не робот

Boost:: asio:: безопасность потока сокетов

(Это упрощенная версия моего первоначального вопроса)

У меня есть несколько потоков, которые пишут в сокет asio. Это, кажется, работает очень хорошо, без проблем.

В документации говорится, что общий сокет не является потокобезопасным (здесь, путь вниз внизу), поэтому мне интересно, следует ли мне защищать сокет с мьютексом или что-то в этом роде.

Этот question настаивает на необходимости защиты, но не дает никаких советов о том, как это сделать.

Все ответы на мой оригинальный вопрос также настаивали на том, что я делаю опасно, и больше всего призывал меня заменить мои записи на async_writes или даже более сложные вещи. Тем не менее, я неохотно это делаю, так как это усложнит код, который уже работает, и никто из ответчиков не убедил меня, что они знают, о чем они говорят, - они, казалось, прочитали ту же документацию, что и я, и догадывались, так же как я был.

Итак, я написал простую программу для стресс-тестирования записи в общий сокет из двух потоков.

Вот сервер, который просто выписывает все, что получает от клиента

int main()
{
    boost::asio::io_service io_service;

    tcp::acceptor acceptor(io_service, tcp::endpoint(tcp::v4(), 3001));

    tcp::socket socket(io_service);
    acceptor.accept(socket);

    for (;;)
    {
        char mybuffer[1256];
        int len = socket.read_some(boost::asio::buffer(mybuffer,1256));
        mybuffer[len] = '\0';
        std::cout << mybuffer;
        std::cout.flush();

    }

  return 0;
}

Вот клиент, который создает два потока, которые записывают в общий сокет так быстро, как могут

boost::asio::ip::tcp::socket * psocket;

void speaker1()
{
    string msg("speaker1: hello, server, how are you running?\n");
    for( int k = 0; k < 1000; k++ ) {
        boost::asio::write(
            *psocket,boost::asio::buffer(msg,msg.length()));
    }

}
void speaker2()
{
    string msg("speaker2: hello, server, how are you running?\n");
    for( int k = 0; k < 1000; k++ ) {
        boost::asio::write(
            *psocket,boost::asio::buffer(msg,msg.length()));
    }

}

int main(int argc, char* argv[])
{

    boost::asio::io_service io_service;

  // connect to server

    tcp::resolver resolver(io_service);
    tcp::resolver::query query("localhost", "3001");
    tcp::resolver::iterator endpoint_iterator = resolver.resolve(query);
    tcp::resolver::iterator end;
    psocket = new tcp::socket(io_service);
    boost::system::error_code error = boost::asio::error::host_not_found;
    while (error && endpoint_iterator != end)
    {
        psocket->close();
        psocket->connect(*endpoint_iterator++, error);
    }


    boost::thread t1( speaker1 );
    boost::thread t2( speaker2 );

    Sleep(50000);

}

Это работает! Отлично, насколько я могу судить. Клиент не падает. Сообщения поступают на сервер без искажений. Они обычно приходят попеременно, по одному от каждой нити. Иногда один поток получает два или три сообщения перед другим, но я не думаю, что это проблема, если нет никаких искажений и все сообщения поступают.

Мое заключение: сокет не может быть потокобезопасным в некотором теоретическом смысле, но так сложно заставить его потерпеть неудачу, что я не буду беспокоиться об этом.

4b9b3361

Ответ 1

После переопределения кода для async_write я теперь убежден, что любая операция записи является потокобезопасной тогда и только тогда, когда размер пакета меньше

default_max_transfer_size = 65536;

Случается, что как только async_write вызывается, async_write_some вызывается в том же потоке. Любые потоки в пуле, вызывающие какую-либо форму io_service:: run, будут продолжать вызывать async_write_some для этой операции записи до ее завершения.

Эти вызовы async_write_some могут и будут чередоваться, если их нужно вызывать более одного раза (пакеты больше 65536).

ASIO не ставит в очередь записи в сокет, как вы ожидали, один заканчивается за другим. Чтобы обеспечить безопасную запись в потоке и interleave, рассмотрим следующий фрагмент кода:

    void my_connection::async_serialized_write(
            boost::shared_ptr<transmission> outpacket) {
        m_tx_mutex.lock();
        bool in_progress = !m_pending_transmissions.empty();
        m_pending_transmissions.push(outpacket);
        if (!in_progress) {
            if (m_pending_transmissions.front()->scatter_buffers.size() > 0) {
                boost::asio::async_write(m_socket,
                    m_pending_transmissions.front()->scatter_buffers,
                        boost::asio::transfer_all(),
            boost::bind(&my_connection::handle_async_serialized_write,
                        shared_from_this(),
                        boost::asio::placeholders::error,
                                       boost::asio::placeholders::bytes_transferred));
            } else { // Send single buffer
                boost::asio::async_write(m_socket,
                                    boost::asio::buffer(
                                           m_pending_transmissions.front()->buffer_references.front(),                          m_pending_transmissions.front()->num_bytes_left),
                boost::asio::transfer_all(),
                boost::bind(
                        &my_connection::handle_async_serialized_write,
                        shared_from_this(),
                        boost::asio::placeholders::error,
                        boost::asio::placeholders::bytes_transferred));
            }
        }
        m_tx_mutex.unlock();
    }

    void my_connection::handle_async_serialized_write(
    const boost::system::error_code& e, size_t bytes_transferred) {
        if (!e) {
            boost::shared_ptr<transmission> transmission;
            m_tx_mutex.lock();
            transmission = m_pending_transmissions.front();
            m_pending_transmissions.pop();
            if (!m_pending_transmissions.empty()) {
                if (m_pending_transmissions.front()->scatter_buffers.size() > 0) {
            boost::asio::async_write(m_socket,
                    m_pending_transmissions.front()->scatter_buffers,
                    boost::asio::transfer_exactly(
                            m_pending_transmissions.front()->num_bytes_left),
                    boost::bind(
                            &chreosis_connection::handle_async_serialized_write,
                            shared_from_this(),
                            boost::asio::placeholders::error,
                            boost::asio::placeholders::bytes_transferred));
                } else { // Send single buffer
                    boost::asio::async_write(m_socket,
                    boost::asio::buffer(
                            m_pending_transmissions.front()->buffer_references.front(),
                            m_pending_transmissions.front()->num_bytes_left),
                    boost::asio::transfer_all(),
                    boost::bind(
                            &my_connection::handle_async_serialized_write,
                            shared_from_this(),
                            boost::asio::placeholders::error,
                            boost::asio::placeholders::bytes_transferred));
                }
            }
            m_tx_mutex.unlock();
            transmission->handler(e, bytes_transferred, transmission);
        } else {
            MYLOG_ERROR(
            m_connection_oid.toString() << " " << "handle_async_serialized_write: " << e.message());
            stop(connection_stop_reasons::stop_async_handler_error);
        }
    }

Это в основном делает очередь для отправки одного пакета за раз. async_write вызывается только после успешного завершения первой записи, которая затем вызывает исходный обработчик для первой записи.

Было бы проще, если бы asio сделало очереди записи автоматическими для каждого сокета/потока.

Ответ 2

Используйте boost::asio::io_service::strand для асинхронных обработчиков, которые не являются потокобезопасными.

Строка определяется как строго последовательный вызов события обработчики (т.е. нет одновременного вызова). Использование прядей позволяет выполнение кода в многопоточной программе без необходимости явная блокировка (например, использование мьютексов).

таймер-учебник, вероятно, самый простой способ обернуть голову вокруг нитей.

Ответ 3

Ключевым для понимания ASIO является понимание того, что обработчики завершения выполняются только в контексте потока, который вызвал io_service.run() независимо от того, какой поток называется асинхронным методом. Если вы вызываете только io_service.run() в одном потоке, то все обработчики завершения будут выполняться последовательно в контексте этого потока. Если вы вызвали io_service.run() в нескольких потоках, то обработчики завершения будут выполняться в контексте одного из этих потоков. Вы можете подумать об этом как о пуле потоков, где потоки в пуле - это те потоки, которые вызвали io_service.run() на том же объекте io_service.

Если у вас есть несколько потоков, вызовите io_service.run(), вы можете заставить сериализовать обработчики завершения, поместив их в strand.

Чтобы ответить на последнюю часть вашего вопроса, вы должны позвонить boost::async_write(). Это отправит операцию записи в поток, который вызвал io_service.run(), и вызывается обработчик завершения при выполнении записи. Если вам нужно сериализовать эту операцию, то это немного сложнее, и вы должны прочитать документацию по цепочкам здесь.

Ответ 4

Похоже, этот вопрос сводится к следующему:

что происходит, когда async_write_some() вызывается одновременно в одном сокете из двух разных потоков

Я считаю, что это точно операция, которая не является потокобезопасной. Порядок, в котором эти буферы выходят на провод, составляет undefined, и они могут даже чередоваться. Особенно если вы используете удобную функцию async_write(), поскольку она реализована как серия вызовов async_write_some() под ней, пока не будет отправлен весь буфер. В этом случае каждый фрагмент, который отправляется из двух потоков, может чередоваться случайным образом.

Единственный способ защитить вас от удара этого случая - создать свою программу, чтобы избежать подобных ситуаций.

Один из способов сделать это - написать буфер отправки прикладного уровня, который один поток отвечает за нажатие на сокет. Таким образом, вы можете защитить только буфер отправки. Имейте в виду, что простой std::vector не будет работать, так как добавление байтов в конец может закончиться перераспределением его, возможно, пока есть выдающийся async_write_some(), ссылающийся на него. Вместо этого, вероятно, неплохо использовать связанный список буферов и использовать функцию разброса/сбора asio.

Ответ 5

Сначала рассмотрим, что сокет является потоком и не подвергается внутренней защите от одновременного чтения и/или записи. Существует три различных соображения.

  • Параллельное выполнение функций, которые обращаются к одному и тому же сокету.
  • Параллельное выполнение делегатов, которые заключают один и тот же сокет.
  • Выполнение чередования делегатов, которые записываются в один и тот же сокет.

пример чата является асинхронным, но не параллельным. io_service запускать из одного потока, делая все клиентские операции чата несовпадающими. Другими словами, он избегает всех этих проблем. Даже async_write должен внутренне завершить отправку всех частей сообщения до начала любой другой работы, избегая проблемы с перемежением.

Обработчики вызываются только потоком, который в настоящее время вызывает любую перегрузку run(), run_one(), poll() или poll_one() для io_service.

Отправляя работу в единственный поток, io_service, другие потоки могут безопасно избегать как concurrency, так и блокировать путем очередности работы в io_service. Если, однако, ваш сценарий исключает возможность буферизации всей работы для данного сокета, все становится более сложным. Возможно, вам придется блокировать сокетную связь (но не потоки), в отличие от очереди в очереди на неопределенное время. Кроме того, рабочая очередь может быть очень сложной для управления, поскольку она полностью непрозрачна.

Если ваш io_service запускает более одного потока, вы все же можете легко избежать вышеуказанных проблем, но вы можете только вызывать чтение или запись из обработчиков других чтений или записей (и при запуске). Эти последовательности обеспечивают доступ к сокету, оставаясь неблокируемым. Безопасность возникает из-за того, что шаблон использует только один поток в любой момент времени. Но публикация работы из независимого потока проблематична - даже если вы не против ее буферизации.

A strand - это класс asio, который отправляет работу в io_service таким образом, который обеспечивает неконкурентный вызов. Однако использование strand для вызова async_read и/или async_write решает только первую из трех проблем. Эти функции встроены в службу io_service сокета. Если эта служба работает с несколькими потоками, то работу можно выполнять одновременно.

Итак, как вы, для данного сокета, безопасно вызывать async_read и/или async_write одновременно?

  • При одновременных вызовах первая проблема может быть решена с помощью мьютекса или строки, используя первый, если вы не хотите буферизировать работу, а вторую - если вы это сделаете. Это защищает сокет во время вызова функции, но ничего не делает для других проблем.

  • Вторая проблема кажется сложнее, потому что трудно понять, что происходит внутри кода, выполняемого асинхронно из двух функций. Функция async выполняет как постработу, так и io_service сокета.

Из источника сокета boost:

/**
 * This constructor creates a stream socket without opening it. The socket
 * needs to be opened and then connected or accepted before data can be sent
 * or received on it.
 *
 * @param io_service The io_service object that the stream socket will use to
 * dispatch handlers for any asynchronous operations performed on the socket.
 */
explicit basic_stream_socket(boost::asio::io_service& io_service)
: basic_socket<Protocol, StreamSocketService>(io_service)
{
}

И из io_service:: run()

/**
 * The run() function blocks until all work has finished and there are no
 * more handlers to be dispatched, or until the io_service has been stopped.
 *
 * Multiple threads may call the run() function to set up a pool of threads
 * from which the io_service may execute handlers. All threads that are
 * waiting in the pool are equivalent and the io_service may choose any one
 * of them to invoke a handler.
 *
 * ...
 */
BOOST_ASIO_DECL std::size_t run();

Итак, если вы даете многосетевое сокет, у него нет выбора, кроме как использовать несколько потоков - несмотря на то, что они не являются потокобезопасными. Единственный способ избежать этой проблемы (помимо замены реализации сокета) - дать сокет только один поток для работы. Для одного сокета это то, что вы хотите в любом случае (так что не надо сбегать, чтобы написать замену).

  1. Третью проблему можно решить, используя (другой) мьютекс, который заблокирован перед async_write, передан в обработчик завершения и разблокирован в этой точке. Это не позволит любому вызывающему абоненту начать запись до тех пор, пока все части предыдущей записи не будут завершены.

Обратите внимание, что сообщения async_write работают в очереди - то, как он может вернуться почти сразу. Если вы слишком много работаете над этим, вам, возможно, придется иметь дело с некоторыми последствиями. Несмотря на использование одного потока io_service для сокета, у вас может быть какое-то количество сообщений, отправляющих работу через параллельные или неконкурентные вызовы async_write.

С другой стороны, async_read прост. Проблема с чередованием отсутствует, и вы просто возвращаетесь назад из обработчика предыдущего вызова. Вы можете или не хотите отправлять полученную работу в другой поток или очередь, но если вы выполняете ее в потоке обработчика завершения, вы просто блокируете все чтения и записи в однопоточном сокете.

UPDATE

Я еще немного углубился в реализацию базовой реализации потока сокетов (для одной платформы). Похоже, что сокет последовательно выполняет вызовы сокетов платформы в вызывающем потоке, а не делегат, отправленный в io_service. Другими словами, несмотря на то, что async_read и async_write появляются немедленно, они фактически выполняют все операции сокета перед возвратом. Только обработчики отправляются в io_service. Это не документировано и не раскрывается кодом exacle, который я просмотрел, но, полагая, что это гарантированное поведение, оно значительно влияет на вторую проблему выше.

Предполагая, что работа, отправленная в io_service, не включает операции сокета, нет необходимости ограничивать io_service единственным потоком. Однако он усиливает важность защиты от одновременного выполнения асинхронных функций. Так, например, если следовать примеру чата, но вместо этого добавляет другой поток в io_service, возникает проблема. С вызовами функции async, выполняемыми внутри обработчиков функций, вы выполняете параллельное выполнение функции. Для этого потребуется либо мьютекс, либо все вызовы функции асинхронного вызова для выполнения на цепочке.

ОБНОВЛЕНИЕ 2

В отношении третьей проблемы (чередование), если размер данных превышает 65536 байт, работа разбивается на внутреннюю на async_write и отправляется по частям. Но важно понимать, что если в io_service имеется более одного потока, куски работы, отличные от первого, будут отправляться в разные потоки. Все это происходит внутри функции async_write до того, как вызывается обработчик завершения. Реализация создает собственные обработчики промежуточного завершения и использует их для выполнения всех операций, кроме первой операции сокета.

Это означает, что любой защитник вокруг вызова async_write (mutex или strand) будет не защищать сокет, если есть несколько потоков io_service и более 64 кбайт данных для публикации ( по умолчанию это может меняться). Поэтому в этом случае чередующийся предохранитель необходим не только для безопасности чередования, но и для обеспечения безопасности резьбы гнезда. Я проверил все это в отладчике.

ВАРИАНТ MUTEX

Функции async_read и async_write внутренне используют io_service, чтобы получить потоки, на которых следует отправлять обработчики завершения, блокируя до тех пор, пока потоки не будут доступны. Это делает их опасными для защиты с помощью замков мьютексов. Когда мьютекс используется для защиты этих функций, тупик будет возникать, когда потоки будут поддерживаться против блокировки, голодая на io_service. Учитывая, что нет другого способа защитить async_write при отправке > 64k с помощью многопоточного io_service, он фактически блокирует нас в один поток в этом сценарии - который, конечно же, разрешает вопрос concurrency.

Ответ 6

В соответствии с новатором 2008 года повышают 1,37 обновления asio, некоторые синхронные операции, в том числе записи "теперь потокобезопасны", позволяющие "одновременные синхронные операции в отдельном сокете, если они поддерживаются ОС" увеличить историю 1.37.0. Это, казалось бы, поддерживает то, что вы видите, но упрощение предложения "Общие объекты: небезопасное" остается в ускоренных документах для ip:: tcp:: socket.

Ответ 7

Это зависит от доступа к одному и тому же объекту сокета из нескольких потоков. Скажем, у вас есть два потока, работающих с такой же функцией io_service::run().

Если, например, вы одновременно выполняете чтение и запись или можете выполнять операцию отмены из другой темы. Тогда это не безопасно.

Однако, если ваш протокол выполняет только одну операцию за раз.

  • Если только один поток запускает прогон io_service, тогда проблем нет. Если вы хотите что-то выполнить в сокете из другого потока, вы можете вызвать io_service:: post() с обработчик, который выполняет эту операцию на сокете, чтобы он выполнялся в том же потоке.
  • Если у вас есть несколько потоков, выполняющих io_service::run, и вы пытаетесь сделать операции одновременно - пусть отмените и прочитайте операцию, тогда вы должны использовать пряди. В документации Boost.Asio есть учебное пособие.

Ответ 8

Я провел обширные тесты и не смог сломать asio. Даже без блокировки каких-либо мьютексов.

Тем не менее я бы посоветовал использовать async_read и async_write с помощью мьютекса вокруг каждого из этих вызовов.

Я считаю, что только обратная связь заключается в том, что ваши обработчики завершения могут быть вызваны одновременно, если у вас есть более одного потока, вызывающего io_service::run.

В моем случае это не было проблемой. Вот мой тестовый код:

#include <boost/thread.hpp>
#include <boost/date_time.hpp>
#include <boost/asio.hpp>
#include <vector>

using namespace std;
char databuffer[256];
vector<boost::asio::const_buffer> scatter_buffer;
boost::mutex my_test_mutex;
void my_test_func(boost::asio::ip::tcp::socket* socket, boost::asio::io_service *io) {
while(1) {
    boost::this_thread::sleep(boost::posix_time::microsec(rand()%1000));

    //my_test_mutex.lock(); // It would be safer 
    socket->async_send(scatter_buffer, boost::bind(&mycallback));
    //my_test_mutex.unlock(); // It would be safer
}
}
int main(int argc, char **argv) {

for(int i = 0; i < 256; ++i)
    databuffer[i] = i;

for(int i = 0; i < 4*90; ++i)
    scatter_buffer.push_back(boost::asio::buffer(databuffer));
boost::asio::io_service my_test_ioservice;
boost::asio::ip::tcp::socket my_test_socket(my_test_ioservice);
boost::asio::ip::tcp::resolver my_test_tcp_resolver(my_test_ioservice);
boost::asio::ip::tcp::resolver::query  my_test_tcp_query("192.168.1.10", "40000");
boost::asio::ip::tcp::resolver::iterator my_test_tcp_iterator = my_test_tcp_resolver.resolve(my_test_tcp_query);
boost::asio::connect(my_test_socket, my_test_tcp_iterator);
for (size_t i = 0; i < 8; ++i) {
    boost::shared_ptr<boost::thread> thread(
            new boost::thread(my_test_func, &my_test_socket, &my_test_ioservice));
}

while(1) {
    my_test_ioservice.run_one();
    boost::this_thread::sleep(boost::posix_time::microsec(rand()%1000));
}
return 0;

}

И вот мой самодельный сервер в python:

import socket
def main():
    mysocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    mysocket.bind((socket.gethostname(), 40000))
    mysocket.listen(1)

    while 1:
        (clientsocket, address) = mysocket.accept()
        print("Connection from: " + str(address))
        i = 0
        count = 0
        while i == ord(clientsocket.recv(1)):
            i += 1
            i %= 256

            count+=1
            if count % 1000 == 0:
                print(count/1000)
        print("Error!")
return 0

if __name__ == '__main__':
    main()

Обратите внимание, что запуск этого кода может привести к трэш-компьютеру.