Подтвердить что ты не робот

С++ эквивалентно Java BlockingQueue

Я переношу некоторый код Java на С++, и в одном конкретном разделе используется BlockingQueue для передачи сообщений от многих производителей одному потребителю.

Если вы не знакомы с тем, что такое Java BlockingQueue, это просто очередь, которая имеет жесткую емкость, которая предоставляет потокобезопасные методы для put() и take() из очереди. put(), если очередь заполнена, и принимать() блоки, если очередь пуста. Кроме того, поставляются таймерные версии этих методов.

Тайм-ауты имеют отношение к моему прецеденту, поэтому рекомендация, которая поставляет их, идеальна. Если нет, я могу сам составить код.

Я googled вокруг и быстро просматривал библиотеки Boost, и я не нахожу ничего подобного. Может быть, я слепой здесь... но кто-нибудь знает хорошие рекомендации?

Спасибо!

4b9b3361

Ответ 1

Это не фиксированный размер и он не поддерживает тайм-ауты, но вот простая реализация очереди, которую я недавно опубликовал с использованием конструкций С++ 2011:

#include <mutex>
#include <condition_variable>
#include <deque>

template <typename T>
class queue
{
private:
    std::mutex              d_mutex;
    std::condition_variable d_condition;
    std::deque<T>           d_queue;
public:
    void push(T const& value) {
        {
            std::unique_lock<std::mutex> lock(this->d_mutex);
            d_queue.push_front(value);
        }
        this->d_condition.notify_one();
    }
    T pop() {
        std::unique_lock<std::mutex> lock(this->d_mutex);
        this->d_condition.wait(lock, [=]{ return !this->d_queue.empty(); });
        T rc(std::move(this->d_queue.back()));
        this->d_queue.pop_back();
        return rc;
    }
};

Должно быть тривиально расширять и использовать время ожидания для всплытия. Основная причина, по которой я этого не сделал, - это то, что я не доволен выборами интерфейса, о которых я думал до сих пор.

Ответ 2

Вот пример очереди блокировки с функцией запроса на отключение :

template <typename T> class BlockingQueue {
  std::condition_variable _cvCanPop;
  std::mutex _sync;
  std::queue<T> _qu;
  bool _bShutdown = false;

public:
  void Push(const T& item)
  {
    {
      std::unique_lock<std::mutex> lock(_sync);
      _qu.push(item);
    }
    _cvCanPop.notify_one();
  }

  void RequestShutdown() {
    {
      std::unique_lock<std::mutex> lock(_sync);
      _bShutdown = true;
    }
    _cvCanPop.notify_all();
  }

  bool Pop(T &item) {
    std::unique_lock<std::mutex> lock(_sync);
    for (;;) {
      if (_qu.empty()) {
        if (_bShutdown) {
          return false;
        }
      }
      else {
        break;
      }
      _cvCanPop.wait(lock);
    }
    item = std::move(_qu.front());
    _qu.pop();
    return true;
  }
};

Ответ 3

Хорошо, я немного опоздал на вечеринку, но я думаю, что это лучше подходит для реализации Java BlockingQueue. Здесь я тоже использую один мьютекс и два условия, чтобы присматривать за не полностью и не пусто. IMO a BlockingQueue имеет больше смысла с ограниченными возможностями, которых я не видел в других ответах. Я также включил простой тестовый сценарий:

#include <iostream>
#include <algorithm>
#include <queue>
#include <mutex>
#include <thread>
#include <condition_variable>

template<typename T>
class blocking_queue {
private:
    size_t _capacity;
    std::queue<T> _queue;
    std::mutex _mutex;
    std::condition_variable _not_full;
    std::condition_variable _not_empty;

public:
    inline blocking_queue(size_t capacity) : _capacity(capacity) {
        // empty
    }

    inline size_t size() const {
        std::unique_lock<std::mutex> lock(_mutex);
        return _queue.size();
    }

    inline bool empty() const {
        std::unique_lock<std::mutex> lock(_mutex);
        return _queue.empty();
    }

    inline void push(const T& elem) {
        {
            std::unique_lock<std::mutex> lock(_mutex);

            // wait while the queue is full
            while (_queue.size() >= _capacity) {
                _not_full.wait(lock);
            }
            std::cout << "pushing element " << elem << std::endl;
            _queue.push(elem);
        }
        _not_empty.notify_all();
    }

    inline void pop() {
        {
            std::unique_lock<std::mutex> lock(_mutex);

            // wait while the queue is empty
            while (_queue.size() == 0) {
                _not_empty.wait(lock);
            }
            std::cout << "popping element " << _queue.front() << std::endl;
            _queue.pop();
        }
        _not_full.notify_one();
    }

    inline const T& front() {
        std::unique_lock<std::mutex> lock(_mutex);

        // wait while the queue is empty
        while (_queue.size() == 0) {
            _not_empty.wait(lock);
        }
        return _queue.front();
    }
};

int main() {
    blocking_queue<int> queue(5);

    // create producers
    std::vector<std::thread> producers;
    for (int i = 0; i < 10; i++) {
        producers.push_back(std::thread([&queue, i]() {
            queue.push(i);
            // produces too fast
            std::this_thread::sleep_for(std::chrono::milliseconds(100));
        }));
    }

    // create consumers
    std::vector<std::thread> consumers;
    for (int i = 0; i < 10; i++) {
        producers.push_back(std::thread([&queue, i]() {
            queue.pop();
            // consumes too slowly
            std::this_thread::sleep_for(std::chrono::milliseconds(1000));
        }));
    }

    std::for_each(producers.begin(), producers.end(), [](std::thread &thread) {
        thread.join();
    });

    std::for_each(consumers.begin(), consumers.end(), [](std::thread &thread) {
        thread.join();
    });

    return EXIT_SUCCESS;
}