Я работаю над написанием статьи о том, как использовать блокирующие VS-блокировки. В настоящее время я выполняю некоторые эксперименты с использованием потоков и блокирующих сокетов и получил некоторые интересные результаты. Я не уверен, как объяснить.
Примечание. Я знаю, что современные серверы используют модель, управляемую событиями, с неблокирующими сокетами для достижения гораздо более высокой производительности, и я работаю в этом направлении, но сначала хочу получить номера данных базовой линии.
Вопрос, который, я думаю, должен задать ниже. Но любой вклад в происходящее или то, что я должен на самом деле просить или нужно время/измерение/экзамен, будет с благодарностью принята.
Настройка
Эксперименты выполняются на Amazon:
Instance T vCPUs Memory (GiB) Storage (GB) Network
c3.2xlarge 8 15 2 x 80 SSD High
Я использую siege для загрузки теста на сервер:
> wc data.txt
0 1 32 data.txt
> siege --delay=0.001 --time=1m --concurrent=<concurrency> -H 'Content-Length: 32' -q '<host>/message POST < data.txt'
Серверы:
У меня есть четыре версии кода. Это самый простой базовый тип HTTP-сервера. Независимо от того, что вы запрашиваете, вы получаете тот же ответ (это в основном для проверки пропускной способности).
- Single Threaded.
- Multi Threaded
Затем каждый принятый запрос обрабатываетсяstd::thread
, который отсоединяется. - Многопоточность с бассейном
Пул потоков фиксированного размераstd::thread
. Каждый принятый запрос создает задание, которое добавляется в очередь заданий для обработки пулом потоков. - Multi Thread с помощью
std::async()
Каждый принятый запрос выполняется через `std:: async(), будущее хранится в очереди. Вторичный поток ожидает, что каждое будущее будет завершено, прежде чем отбрасывать его.
Ожидания
- Одиночный: худшая производительность
Он должен превышать максимальную скорость. - Multi: лучше, чем одиночный поток.
Но при наличии большого количества параллельных соединений производительность значительно снизится. Мои эксперименты заканчиваются на 255 активных соединениях (и, следовательно, 255 потоках) в 8-ядерной системе. - Пул потоков: лучше, чем несколько.
Поскольку мы создаем только столько потоков, сколько может поддерживать аппаратное обеспечение, не должно быть ухудшения производительности. - Async: похоже на пул потоков.
Хотя я ожидаю, что это будет немного более эффективно, чем рукописный пул потоков.
Фактические результаты
Проверены фактические параллельные размеры.
1, 2, 4, 8, 16, 32, 48, 64, 80, 96, 112, 128, 144, 160, 176, 192, 208, 224, 240, 255
Я был удивлен выступлением "Multi" Threaded. Поэтому я удвоил размер версии пула потоков, чтобы узнать, что произошло.
ThreadQueue jobs(std::thread::hardware_concurrency());
// Changed this line to:
ThreadQueue jobs(std::thread::hardware_concurrency() * 2);
Вот почему вы видите две строки для пула потоков на графиках.
Нужна помощь
Не удивительно, что стандартная библиотека std::async()
- лучшая версия. Но я полностью ошеломлен многопоточной версией, имеющей в основном ту же производительность.
Эта версия (Multi Threaded) создает новый поток для каждого принятого входящего соединения, а затем просто отделяет поток (позволяя ему работать до завершения). По мере того, как concurrency достигнет 255, в процессах будет задействовано 255 фоновых потоков.
Итак, вопрос:
Учитывая короткое время выполнения Socket::worker()
, я не могу поверить, что затраты на создание потока незначительны по сравнению с этой работой. Кроме того, поскольку он поддерживает аналогичную производительность с std::async()
, похоже, предполагается, что повторное использование происходит за кулисами.
Есть ли у кого-нибудь знания о требованиях к стандарту для повторного использования потоков и что я должен ожидать, чтобы поведение повторного использования было?
В какой момент будет разрушаться модель блокировки? При 255 одновременных запросах я не ожидал, что модель потоковой обработки не отстанет. Мне, очевидно, нужно reset мои ожидания здесь.
код
Код обертки сокета - очень тонкий слой стандартных сокетов (просто бросая исключения, когда все идет не так). текущий код здесь, если это необходимо, но я не думаю, что это важно.
Полный источник этого кода доступен здесь.
Носок:: работник
Это общий бит кода, который является общим для всех серверов. В основном он принимает принятый объект сокета (через перемещение) и в основном записывает объект data
в этот сокет.
void worker(DataSocket&& accepted, ServerSocket& server, std::string const& data, int& finished)
{
DataSocket accept(std::move(accepted));
HTTPServer acceptHTTPServer(accept);
try
{
std::string message;
acceptHTTPServer.recvMessage(message);
// std::cout << message << "\n";
if (!finished && message == "Done")
{
finished = 1;
server.stop();
acceptHTTPServer.sendMessage("", "Stoped");
}
else
{
acceptHTTPServer.sendMessage("", data);
}
}
catch(DropDisconnectedPipe const& e)
{
std::cerr << "Pipe Disconnected: " << e.what() << "\n";
}
}
Отдельная тема
int main(int argc, char* argv[])
{
// Builds a string that is sent back with each response.
std::string data = Sock::commonSetUp(argc, argv);
int finished = 0;
Sock::ServerSocket server(8080);
while(!finished)
{
Sock::DataSocket accept = server.accept();
// Simply sends "data" back over http.
Sock::worker(std::move(accept), server, data, finished);
}
}
Multi Thread
int main(int argc, char* argv[])
{
std::string data = Sock::commonSetUp(argc, argv);
int finished = 0;
Sock::ServerSocket server(8080);
while(!finished)
{
Sock::DataSocket accept = server.accept();
std::thread work(Sock::worker, std::move(accept), std::ref(server), std::ref(data), std::ref(finished));
work.detach();
}
}
Многопоточный поток с очередью
int main(int argc, char* argv[])
{
std::string data = Sock::commonSetUp(argc, argv);
int finished = 0;
Sock::ServerSocket server(8080);
std::cerr << "Concurrency: " << std::thread::hardware_concurrency() << "\n";
ThreadQueue jobs(std::thread::hardware_concurrency());
while(!finished)
{
Sock::DataSocket accept = server.accept();
// Had some issues with storing a lambda that captured
// a move only object so I created WorkJob as a simple
// functor instead of the lambda.
jobs.startJob(WorkJob(std::move(accept), server, data, finished));
}
}
Затем Вспомогательный код для управления пулом
class WorkJob
{
Sock::DataSocket accept;
Sock::ServerSocket& server;
std::string const& data;
int& finished;
public:
WorkJob(Sock::DataSocket&& accept, Sock::ServerSocket& server, std::string const& data, int& finished)
: accept(std::move(accept))
, server(server)
, data(data)
, finished(finished)
{}
WorkJob(WorkJob&& rhs)
: accept(std::move(rhs.accept))
, server(rhs.server)
, data(rhs.data)
, finished(rhs.finished)
{}
void operator()()
{
Sock::worker(std::move(accept), server, data, finished);
}
};
class ThreadQueue
{
using WorkList = std::deque<WorkJob>;
std::vector<std::thread> threads;
std::mutex safe;
std::condition_variable cond;
WorkList work;
int finished;
WorkJob getWorkJob()
{
std::unique_lock<std::mutex> lock(safe);
cond.wait(lock, [this](){return !(this->futures.empty() && !this->finished);});
auto result = std::move(work.front());
work.pop_front();
return result;
}
void doWork()
{
while(!finished)
{
WorkJob job = getWorkJob();
if (!finished)
{
job();
}
}
}
public:
void startJob(WorkJob&& item)
{
std::unique_lock<std::mutex> lock(safe);
work.push_back(std::move(item));
cond.notify_one();
}
ThreadQueue(int count)
: threads(count)
, finished(false)
{
for(int loop = 0;loop < count; ++loop)
{
threads[loop] = std::thread(&ThreadQueue::doWork, this);
}
}
~ThreadQueue()
{
{
std::unique_lock<std::mutex> lock(safe);
finished = true;
}
cond.notify_all();
}
};
Асинхронный
int main(int argc, char* argv[])
{
std::string data = Sock::commonSetUp(argc, argv);
int finished = 0;
Sock::ServerSocket server(8080);
FutureQueue future(finished);
while(!finished)
{
Sock::DataSocket accept = server.accept();
future.addFuture([accept = std::move(accept), &server, &data, &finished]() mutable {Sock::worker(std::move(accept), server, data, finished);});
}
}
Вспомогательный класс, чтобы убрать будущее.
class FutureQueue
{
using MyFuture = std::future<void>;
using FutureList = std::list<MyFuture>;
int& finished;
FutureList futures;
std::mutex mutex;
std::condition_variable cond;
std::thread cleaner;
void waiter()
{
while(finished)
{
std::future<void> next;
{
std::unique_lock<std::mutex> lock(mutex);
cond.wait(lock, [this](){return !(this->futures.empty() && !this->finished);});
if (futures.empty() && !finished)
{
next = std::move(futures.front());
futures.pop_front();
}
}
if (!next.valid())
{
next.wait();
}
}
}
public:
FutureQueue(int& finished)
: finished(finished)
, cleaner(&FutureQueue::waiter, this)
{}
~FutureQueue()
{
cleaner.join();
}
template<typename T>
void addFuture(T&& lambda)
{
std::unique_lock<std::mutex> lock(mutex);
futures.push_back(std::async(std::launch::async, std::move(lambda)));
cond.notify_one();
}
};