Подтвердить что ты не робот

Поведение С++ std:: thread

Я работаю над написанием статьи о том, как использовать блокирующие VS-блокировки. В настоящее время я выполняю некоторые эксперименты с использованием потоков и блокирующих сокетов и получил некоторые интересные результаты. Я не уверен, как объяснить.

Примечание. Я знаю, что современные серверы используют модель, управляемую событиями, с неблокирующими сокетами для достижения гораздо более высокой производительности, и я работаю в этом направлении, но сначала хочу получить номера данных базовой линии.

Вопрос, который, я думаю, должен задать ниже. Но любой вклад в происходящее или то, что я должен на самом деле просить или нужно время/измерение/экзамен, будет с благодарностью принята.

Настройка

Эксперименты выполняются на Amazon:

Instance T    vCPUs     Memory (GiB)   Storage (GB) Network
c3.2xlarge     8          15           2 x 80 SSD     High

Я использую siege для загрузки теста на сервер:

> wc data.txt
   0       1      32 data.txt
> siege --delay=0.001 --time=1m --concurrent=<concurrency> -H 'Content-Length: 32'  -q '<host>/message POST < data.txt'

Серверы:

У меня есть четыре версии кода. Это самый простой базовый тип HTTP-сервера. Независимо от того, что вы запрашиваете, вы получаете тот же ответ (это в основном для проверки пропускной способности).

  • Single Threaded.
  • Multi Threaded
    Затем каждый принятый запрос обрабатывается std::thread, который отсоединяется.
  • Многопоточность с бассейном
    Пул потоков фиксированного размера std::thread. Каждый принятый запрос создает задание, которое добавляется в очередь заданий для обработки пулом потоков.
  • Multi Thread с помощью std::async()
    Каждый принятый запрос выполняется через `std:: async(), будущее хранится в очереди. Вторичный поток ожидает, что каждое будущее будет завершено, прежде чем отбрасывать его.

Ожидания

  • Одиночный: худшая производительность
    Он должен превышать максимальную скорость.
  • Multi: лучше, чем одиночный поток.
    Но при наличии большого количества параллельных соединений производительность значительно снизится. Мои эксперименты заканчиваются на 255 активных соединениях (и, следовательно, 255 потоках) в 8-ядерной системе.
  • Пул потоков: лучше, чем несколько.
    Поскольку мы создаем только столько потоков, сколько может поддерживать аппаратное обеспечение, не должно быть ухудшения производительности.
  • Async: похоже на пул потоков.
    Хотя я ожидаю, что это будет немного более эффективно, чем рукописный пул потоков.

Фактические результаты

Проверены фактические параллельные размеры.

1, 2, 4, 8, 16, 32, 48, 64, 80, 96, 112, 128, 144, 160, 176, 192, 208, 224, 240, 255

Размер ответа 200 байт Размер ответа 2000 байт

Я был удивлен выступлением "Multi" Threaded. Поэтому я удвоил размер версии пула потоков, чтобы узнать, что произошло.

ThreadQueue     jobs(std::thread::hardware_concurrency());
// Changed this line to:
ThreadQueue     jobs(std::thread::hardware_concurrency() * 2);

Вот почему вы видите две строки для пула потоков на графиках.

Нужна помощь

Не удивительно, что стандартная библиотека std::async() - лучшая версия. Но я полностью ошеломлен многопоточной версией, имеющей в основном ту же производительность.

Эта версия (Multi Threaded) создает новый поток для каждого принятого входящего соединения, а затем просто отделяет поток (позволяя ему работать до завершения). По мере того, как concurrency достигнет 255, в процессах будет задействовано 255 фоновых потоков.

Итак, вопрос:

Учитывая короткое время выполнения Socket::worker(), я не могу поверить, что затраты на создание потока незначительны по сравнению с этой работой. Кроме того, поскольку он поддерживает аналогичную производительность с std::async(), похоже, предполагается, что повторное использование происходит за кулисами.

Есть ли у кого-нибудь знания о требованиях к стандарту для повторного использования потоков и что я должен ожидать, чтобы поведение повторного использования было?

В какой момент будет разрушаться модель блокировки? При 255 одновременных запросах я не ожидал, что модель потоковой обработки не отстанет. Мне, очевидно, нужно reset мои ожидания здесь.

код

Код обертки сокета - очень тонкий слой стандартных сокетов (просто бросая исключения, когда все идет не так). текущий код здесь, если это необходимо, но я не думаю, что это важно.

Полный источник этого кода доступен здесь.

Носок:: работник

Это общий бит кода, который является общим для всех серверов. В основном он принимает принятый объект сокета (через перемещение) и в основном записывает объект data в этот сокет.

void worker(DataSocket&& accepted, ServerSocket& server, std::string const& data, int& finished)
{
    DataSocket  accept(std::move(accepted));
    HTTPServer  acceptHTTPServer(accept);
    try
    {
        std::string message;
        acceptHTTPServer.recvMessage(message);
        // std::cout << message << "\n";
        if (!finished && message == "Done")
        {
            finished = 1;
            server.stop();
            acceptHTTPServer.sendMessage("", "Stoped");
        }
        else
        {
            acceptHTTPServer.sendMessage("", data);
        }
    }
    catch(DropDisconnectedPipe const& e)
    {
        std::cerr << "Pipe Disconnected: " << e.what() << "\n";
    }
}

Отдельная тема

int main(int argc, char* argv[])
{
    // Builds a string that is sent back with each response.
    std::string          data     = Sock::commonSetUp(argc, argv);
    int                  finished = 0;
    Sock::ServerSocket   server(8080);

    while(!finished)
    {
        Sock::DataSocket  accept  = server.accept();
        // Simply sends "data" back over http.
        Sock::worker(std::move(accept), server, data, finished);
    }
}

Multi Thread

int main(int argc, char* argv[])
{
    std::string          data     = Sock::commonSetUp(argc, argv);
    int                  finished = 0;
    Sock::ServerSocket   server(8080);

    while(!finished)
    {
        Sock::DataSocket  accept  = server.accept();

        std::thread work(Sock::worker, std::move(accept), std::ref(server), std::ref(data), std::ref(finished));
        work.detach();
    }
}

Многопоточный поток с очередью

int main(int argc, char* argv[])
{
    std::string          data     = Sock::commonSetUp(argc, argv);
    int                  finished = 0;
    Sock::ServerSocket   server(8080);

    std::cerr << "Concurrency: " << std::thread::hardware_concurrency() << "\n";
    ThreadQueue     jobs(std::thread::hardware_concurrency());

    while(!finished)
    {
        Sock::DataSocket  accept  = server.accept();

        // Had some issues with storing a lambda that captured
        // a move only object so I created WorkJob as a simple
        // functor instead of the lambda.
        jobs.startJob(WorkJob(std::move(accept), server, data, finished));
    }
}

Затем Вспомогательный код для управления пулом

class WorkJob
{
    Sock::DataSocket    accept;
    Sock::ServerSocket& server;
    std::string const&  data;
    int&                finished;
    public:
        WorkJob(Sock::DataSocket&& accept, Sock::ServerSocket& server, std::string const& data, int& finished)
            : accept(std::move(accept))
            , server(server)
            , data(data)
            , finished(finished)
        {}
        WorkJob(WorkJob&& rhs)
            : accept(std::move(rhs.accept))
            , server(rhs.server)
            , data(rhs.data)
            , finished(rhs.finished)
        {}
        void operator()()
        {
            Sock::worker(std::move(accept), server, data, finished);
        }
};
class ThreadQueue
{
    using WorkList = std::deque<WorkJob>;

    std::vector<std::thread>    threads;
    std::mutex                  safe;
    std::condition_variable     cond;
    WorkList                    work;
    int                         finished;

    WorkJob getWorkJob()
    {
        std::unique_lock<std::mutex>     lock(safe);
        cond.wait(lock, [this](){return !(this->futures.empty() && !this->finished);});

        auto result = std::move(work.front());
        work.pop_front();
        return result;
    }
    void doWork()
    {
        while(!finished)
        {
            WorkJob job = getWorkJob();
            if (!finished)
            {
                job();
            }
        }
    }

    public:
        void startJob(WorkJob&& item)
        {
            std::unique_lock<std::mutex>     lock(safe);
            work.push_back(std::move(item));
            cond.notify_one();
        }

        ThreadQueue(int count)
            : threads(count)
            , finished(false)
        {
            for(int loop = 0;loop < count; ++loop)
            {
                threads[loop] = std::thread(&ThreadQueue::doWork, this);
            }
        }
        ~ThreadQueue()
        {
            {
                std::unique_lock<std::mutex>     lock(safe);
                finished = true;
            }
            cond.notify_all();
        }
};

Асинхронный

int main(int argc, char* argv[])
{
    std::string          data     = Sock::commonSetUp(argc, argv);
    int                  finished = 0;
    Sock::ServerSocket   server(8080);

    FutureQueue          future(finished);

    while(!finished)
    {
        Sock::DataSocket  accept  = server.accept();

        future.addFuture([accept = std::move(accept), &server, &data, &finished]() mutable {Sock::worker(std::move(accept), server, data, finished);});
    }
}

Вспомогательный класс, чтобы убрать будущее.

class FutureQueue
{
    using MyFuture   = std::future<void>;
    using FutureList = std::list<MyFuture>;

    int&                        finished;
    FutureList                  futures;
    std::mutex                  mutex;
    std::condition_variable     cond;
    std::thread                 cleaner;

    void waiter()
    {
        while(finished)
        {
            std::future<void>   next;
            {
                std::unique_lock<std::mutex> lock(mutex);
                cond.wait(lock, [this](){return !(this->futures.empty() && !this->finished);});
                if (futures.empty() && !finished)
                {
                    next = std::move(futures.front());
                    futures.pop_front();
                }
            }
            if (!next.valid())
            {
                next.wait();
            }
        }

    }
    public:
        FutureQueue(int& finished)
            : finished(finished)
            , cleaner(&FutureQueue::waiter, this)
        {}
        ~FutureQueue()
        {
            cleaner.join();
        }

        template<typename T>
        void addFuture(T&& lambda)
        {
            std::unique_lock<std::mutex> lock(mutex);
            futures.push_back(std::async(std::launch::async, std::move(lambda)));
            cond.notify_one();
        }
};
4b9b3361

Ответ 1

Это приложение, безусловно, будет связано с I/O-привязкой, а не с привязкой к ЦП, что означает, что подавляющее большинство времени, потраченное на обработку какого-либо одного запроса, затрачивается на ожидание блокировки операций ввода-вывода, а не на самом деле выполнение вычислений.

Таким образом, наличие большего количества потоков (вплоть до точки, но, скорее всего, из 256 из них) будет более быстрым, поскольку оно позволяет использовать более параллельный ввод-вывод в разных сокетах, поскольку все они заменяют процессоры.

Другими словами, это не 8 ядер, что узкое место, это сокет связи. Таким образом, вы хотите как можно больше распараллелить (или использовать неблокирующий ввод-вывод).

Ответ 2

Важно не стоимость создания потока и затрат на работу. Важна стоимость вашей очереди потоков и распределения работы по отношению к общей стоимости создания потоков и выполнения команд.

В частности, я смущен этим в вашей логике отправки очереди потоков:

    std::unique_lock<std::mutex>     lock(safe);
    cond.wait(lock, [this](){return !(this->futures.empty() && !this->finished);});

Каждой задаче предшествует эта блокировка. Это делает получение чего-то из очереди потоков дорогостоящим. Кроме того, при добавлении задания используется аналогичная блокировка. И есть только один поток, который добавляет задания; если он блокируется, то некоторая очередь может не получить следующее задание.

В самом деле, если какой-либо из этих блоков блокирует, кто-то, вероятно, останется на холоде. Вероятно, поэтому добавление большего количества потоков в очередь помогло; это уменьшило вероятность того, что кто-то должен заблокировать.

Я не знаю подробных сведений о асинхронном программировании, но я думаю, что свободные очереди будут лучше для производительности.

Ответ 3

Если вы хотите масштабировать линейно на более чем 8 ядрах, вам нужно искать алгоритмы блокировки. Это означает использование методов сравнения и свопинга для данных, которые необходимо разделить между потоками.

Есть замечательная книга Энтони Уильямс, С++ Concurrency в действии: Практическая многопоточность, которая покажет вам некоторые из идей.