Подтвердить что ты не робот

Могу ли я использовать std:: async, не дожидаясь будущего ограничения?

Высокий уровень
Я хочу вызвать некоторые функции без возвращаемого значения в асинхронном режиме, не дожидаясь их завершения. Если я использую std:: async, будущий объект не разрушает, пока задача не закончится, это сделает вызов несинхронизирующим в моем случае.

Пример

void sendMail(const std::string& address, const std::string& message)
{
    //sending the e-mail which takes some time...
}

myResonseType processRequest(args...)
{
    //Do some processing and valuate the address and the message...

    //Sending the e-mail async
    auto f = std::async(std::launch::async, sendMail, address, message);

    //returning the response ASAP to the client
    return myResponseType;

} //<-- I'm stuck here until the async call finish to allow f to be destructed.
  // gaining no benefit from the async call.

Мои вопросы

  • Есть ли способ преодолеть это ограничение?
  • if (1) is no, следует ли реализовать один раз поток, который будет принимать эти фьючерсы "зомби" и ждать на них?
  • Является ли (1) и (2) нет, есть ли другой вариант, тогда просто создайте собственный пул потоков?

Примечание:
Я предпочитаю не использовать опцию thread + detach (предложенную @galop1n), так как при создании нового потока есть накладные расходы, которых я хочу избежать. При использовании std:: async (по крайней мере, на MSVC) используется пул внутренних потоков.

Спасибо.

4b9b3361

Ответ 1

Вы можете перенести будущее в глобальный объект, поэтому при запуске локального будущего деструктора ему не нужно ждать завершения асинхронного потока.

std::vector<std::future<void>> pending_futures;

myResonseType processRequest(args...)
{
    //Do some processing and valuate the address and the message...

    //Sending the e-mail async
    auto f = std::async(std::launch::async, sendMail, address, message);

    // transfer the future shared state to a longer-lived future
    pending_futures.push_back(std::move(f));

    //returning the response ASAP to the client
    return myResponseType;

}

N.B. Это небезопасно, если асинхронный поток ссылается на любые локальные переменные в функции processRequest.

При использовании std::async (по крайней мере, на MSVC) используется внутренний пул потоков.

В действительности это не соответствует требованиям, стандарт явно указывает, что задачи, выполняемые с помощью std::launch::async, должны выполняться как в новом потоке, поэтому любые локальные переменные нитей не должны сохраняться из одной задачи в другую. Обычно это не имеет значения.

Ответ 2

почему вы не просто начинаете нить и отсоединяетесь, если не хотите присоединиться?

std::thread{ sendMail, address, message}.detach();   

std:: async привязан к времени жизни std:: future, который он возвращает, и их нет альтернативы этому.

Включение std:: future в очереди ожидания, прочитанной другим потоком, потребует того же механизма безопасности, что и пул, получающий новую задачу, например мьютекс вокруг контейнера.

Таким образом, ваш лучший вариант - это пул потоков, чтобы использовать задачи, непосредственно вложенные в поточную безопасную очередь. И это не будет зависеть от конкретной реализации.

Ниже реализации пула потоков, принимающих любые вызываемые аргументы и аргументы, потоки выполняют сеанс в очереди, лучшая реализация должна использовать переменные условия (coliru):

#include <iostream>
#include <queue>
#include <memory>
#include <thread>
#include <mutex>
#include <functional>
#include <string>

struct ThreadPool {
    struct Task {
        virtual void Run() const = 0;
        virtual ~Task() {};
    };   

    template < typename task_, typename... args_ >
    struct RealTask : public Task {
        RealTask( task_&& task, args_&&... args ) : fun_( std::bind( std::forward<task_>(task), std::forward<args_>(args)... ) ) {}
        void Run() const override {
            fun_();
        }
    private:
        decltype( std::bind(std::declval<task_>(), std::declval<args_>()... ) ) fun_;
    };

    template < typename task_, typename... args_ >
    void AddTask( task_&& task, args_&&... args ) {
        auto lock = std::unique_lock<std::mutex>{mtx_};
        using FinalTask = RealTask<task_, args_... >;
        q_.push( std::unique_ptr<Task>( new FinalTask( std::forward<task_>(task), std::forward<args_>(args)... ) ) );
    }

    ThreadPool() {
        for( auto & t : pool_ )
            t = std::thread( [=] {
                while ( true ) {
                    std::unique_ptr<Task> task;
                    {
                        auto lock = std::unique_lock<std::mutex>{mtx_};
                        if ( q_.empty() && stop_ ) 
                            break;
                        if ( q_.empty() )
                            continue;
                        task = std::move(q_.front());
                        q_.pop();
                    }
                    if (task)
                        task->Run();
                }
            } );
    }
    ~ThreadPool() {
        {
            auto lock = std::unique_lock<std::mutex>{mtx_};
            stop_ = true;
        }
        for( auto & t : pool_ )
            t.join();
    }
private:
    std::queue<std::unique_ptr<Task>> q_;
    std::thread pool_[8]; 
    std::mutex mtx_;
    volatile bool stop_ {};
};

void foo( int a, int b ) {
    std::cout << a << "." << b;
}
void bar( std::string const & s) {
    std::cout << s;
}

int main() {
    ThreadPool pool;
    for( int i{}; i!=42; ++i ) {
        pool.AddTask( foo, 3, 14 );    
        pool.AddTask( bar, " - " );    
    }
}

Ответ 3

Вместо того, чтобы переместить будущее в глобальный объект (и вручную управлять удалением неиспользуемых фьючерсов), вы можете перенести его в локальную область асинхронной функции.

"Пусть асинхронная функция принимает свое собственное будущее", так сказать.

Я придумал эту оболочку шаблона, которая работает для меня (протестирована в Windows):

#include <future>

template<class Function, class... Args>
void async_wrapper(Function&& f, Args&&... args, std::future<void>& future,
                   std::future<void>&& is_valid, std::promise<void>&& is_moved) {
    is_valid.wait(); // Wait until the return value of std::async is written to "future"
    auto our_future = std::move(future); // Move "future" to a local variable
    is_moved.set_value(); // Only now we can leave void_async in the main thread

    // This is also used by std::async so that member function pointers work transparently
    auto functor = std::bind(f, std::forward<Args>(args)...);
    functor();
}

template<class Function, class... Args> // This is what you call instead of std::async
void void_async(Function&& f, Args&&... args) {
    std::future<void> future; // This is for std::async return value
    // This is for our synchronization of moving "future" between threads
    std::promise<void> valid;
    std::promise<void> is_moved;
    auto valid_future = valid.get_future();
    auto moved_future = is_moved.get_future();

    // Here we pass "future" as a reference, so that async_wrapper
    // can later work with std::async return value
    future = std::async(
        async_wrapper<Function, Args...>,
        std::forward<Function>(f), std::forward<Args>(args)...,
        std::ref(future), std::move(valid_future), std::move(is_moved)
    );
    valid.set_value(); // Unblock async_wrapper waiting for "future" to become valid
    moved_future.wait(); // Wait for "future" to actually be moved
}

Я немного удивлен, что это работает, потому что я думал, что перемещенный будущий деструктор будет блокироваться, пока мы не покинем async_wrapper. Он должен ждать возвращения async_wrapper, но он ждет внутри этой самой функции. Логично, что это должен быть тупик, но это не так.

Я также попытался добавить строку в конец async_wrapper, чтобы вручную удалить будущий объект:

our_future = std::future<void>();

Это также не блокирует.