Подтвердить что ты не робот

Packaged_task висит на операторе()

Компиляция с gcc 4.7.2 на Ubuntu, скомпилированная с -std=c++11 -O0 -pthread, я каким-то образом создал тупик в коде, который не кажется, что он должен когда-либо сталкиваться с этой проблемой. У меня есть поток, который просто получает блокировку, а затем проходит через vector<function<void()>>, вызывая все. Между тем, основной поток нажимает на него std::packaged_task<int()> один за другим и блокируется, когда возвращается эта задача future. Сами задачи тривиальны (печать и возврат).

Вот полный код. Запуск приложения иногда преуспевает, но внутри нескольких попыток зависает:

#include <iostream>
#include <future>
#include <thread>
#include <vector>
#include <functional>

std::unique_lock<std::mutex> lock() {
    static std::mutex mtx;
    return std::unique_lock<std::mutex>{mtx};
}

int main(int argc, char** argv)
{
    std::vector<std::function<void()>> messages;
    std::atomic<bool> running{true};

    std::thread thread = std::thread([&]{
        while (running) {
            auto lk = lock();
            std::cout << "[T] locked with " << messages.size() << " messages." << std::endl;
            for (auto& fn: messages) {
                fn();
            }   
            messages.clear();
        }   
    }); 

    for (int i = 0; i < 1000000; ++i) {
        std::packaged_task<int()> task([=]{
            std::cout << "[T] returning " << i << std::endl;
            return i;
        }); 

        {   
            auto lk = lock();
            messages.emplace_back(std::ref(task));
        }   

        task.get_future().get();
    }   

    running = false;
    thread.join();
}

Пример вывода:

[T] returning 127189
[T] locked with 0 messages.
[T] locked with 0 messages.
[T] locked with 0 messages.
[T] locked with 0 messages.
[T] locked with 0 messages.
[T] locked with 0 messages.
[T] locked with 0 messages.
[T] locked with 1 messages.
[T] returning 127190
[T] locked with 0 messages.
[T] locked with 0 messages.
[T] locked with 0 messages.
[T] locked with 0 messages.
[T] locked with 0 messages.
[T] locked with 1 messages.
[T] returning 127191
[T] locked with 0 messages.
[T] locked with 0 messages.
[T] locked with 0 messages.
[T] locked with 0 messages.
[T] locked with 1 messages.
... hangs forever ...

Что происходит? Почему вызов в packaged_task::operator() зависает? Где тупик? Это ошибка gcc?

[ update] В тупике два потока:

Тема 1 (строка 39 - строка task.get_future().get()):

#0  [email protected]@GLIBC_2.3.2 () at ../nptl/sysdeps/unix/sysv/linux/x86_64/pthread_cond_wait.S:162
#1  0x00007feb01fe800c in __gthread_cond_wait (this=Unhandled dwarf expression opcode 0xf3
)
    at [snip]/libstdc++-v3/include/x86_64-unknown-linux-gnu/bits/gthr-default.h:879
#2  std::condition_variable::wait (this=Unhandled dwarf expression opcode 0xf3
) at [snip]/gcc-4.7.2/libstdc++-v3/src/c++11/condition_variable.cc:52
#3  0x0000000000404aff in void std::condition_variable::wait<std::__future_base::_State_base::wait()::{lambda()#1}>(std::unique_lock<std::mutex>&, std::__future_base::_State_base::wait()::{lambda()#1}) (this=0x6111e0, __lock=..., __p=...)
    at [snip]gcc-4.7.2/lib/gcc/x86_64-unknown-linux-gnu/4.7.2/../../../../include/c++/4.7.2/condition_variable:93
#4  0x0000000000404442 in std::__future_base::_State_base::wait (this=0x6111a8)
    at [snip]gcc-4.7.2/lib/gcc/x86_64-unknown-linux-gnu/4.7.2/../../../../include/c++/4.7.2/future:331
#5  0x00000000004060fb in std::__basic_future<int>::_M_get_result (this=0x7fffc451daa0)
    at [snip]gcc-4.7.2/lib/gcc/x86_64-unknown-linux-gnu/4.7.2/../../../../include/c++/4.7.2/future:601
#6  0x0000000000405488 in std::future<int>::get (this=0x7fffc451daa0)
    at [snip]gcc-4.7.2/lib/gcc/x86_64-unknown-linux-gnu/4.7.2/../../../../include/c++/4.7.2/future:680
#7  0x00000000004024dc in main (argc=1, argv=0x7fffc451dbb8) at test.cxx:39

и Thread 2 (строка 22 - строка fn()):

#0  pthread_once () at ../nptl/sysdeps/unix/sysv/linux/x86_64/pthread_once.S:95
#1  0x00000000004020f6 in __gthread_once (__once=0x611214, __func=0x401e68 <[email protected]>)
    at [snip]/gcc-4.7.2/lib/gcc/x86_64-unknown-linux-gnu/4.7.2/../../../../include/c++/4.7.2/x86_64-unknown-linux-gnu/bits/gthr-default.h:718
#2  0x0000000000404db1 in void std::call_once<void (std::__future_base::_State_base::*)(std::function<std::unique_ptr<std::__future_base::_Result_base, std::__future_base::_Result_base::_Deleter> ()()>&, bool&), std::__future_base::_State_base* const, std::reference_wrapper<std::function<std::unique_ptr<std::__future_base::_Result_base, std::__future_base::_Result_base::_Deleter> ()()> >, std::reference_wrapper<bool> >(std::once_flag&, void (std::__future_base::_State_base::*&&)(std::function<std::unique_ptr<std::__future_base::_Result_base, std::__future_base::_Result_base::_Deleter> ()()>&, bool&), std::__future_base::_State_base* const&&, std::reference_wrapper<std::function<std::unique_ptr<std::__future_base::_Result_base, std::__future_base::_Result_base::_Deleter> ()()> >&&, std::reference_wrapper<bool>&&) (__once=..., [email protected])
    at [snip]/gcc-4.7.2/lib/gcc/x86_64-unknown-linux-gnu/4.7.2/../../../../include/c++/4.7.2/mutex:819
#3  0x0000000000404517 in std::__future_base::_State_base::_M_set_result(std::function<std::unique_ptr<std::__future_base::_Result_base, std::__future_base::_Result_base::_Deleter> ()()>, bool) (this=0x6111a8, __res=..., __ignore_failure=false)
    at [snip]/gcc-4.7.2/lib/gcc/x86_64-unknown-linux-gnu/4.7.2/../../../../include/c++/4.7.2/future:362
#4  0x0000000000407af0 in std::__future_base::_Task_state<int ()()>::_M_run() (this=0x6111a8)
    at [snip]/gcc-4.7.2/lib/gcc/x86_64-unknown-linux-gnu/4.7.2/../../../../include/c++/4.7.2/future:1271
#5  0x00000000004076cc in std::packaged_task<int ()()>::operator()() (this=0x7fffc451da30)
    at [snip]/gcc-4.7.2/lib/gcc/x86_64-unknown-linux-gnu/4.7.2/../../../../include/c++/4.7.2/future:1379
#6  0x000000000040745a in std::_Function_handler<void ()(), std::reference_wrapper<std::packaged_task<int ()()> > >::_M_invoke(std::_Any_data const&) (
    __functor=...) at [snip]/gcc-4.7.2/lib/gcc/x86_64-unknown-linux-gnu/4.7.2/../../../../include/c++/4.7.2/functional:1956
#7  0x00000000004051f2 in std::function<void ()()>::operator()() const (this=0x611290)
    at [snip]/gcc-4.7.2/lib/gcc/x86_64-unknown-linux-gnu/4.7.2/../../../../include/c++/4.7.2/functional:2311
#8  0x000000000040232f in operator() (__closure=0x611040) at test.cxx:22
#9  0x0000000000403d8e in _M_invoke<> (this=0x611040)
    at [snip]/gcc-4.7.2/lib/gcc/x86_64-unknown-linux-gnu/4.7.2/../../../../include/c++/4.7.2/functional:1598
#10 0x0000000000403cdb in operator() (this=0x611040)
    at [snip]/gcc-4.7.2/lib/gcc/x86_64-unknown-linux-gnu/4.7.2/../../../../include/c++/4.7.2/functional:1586
#11 0x0000000000403c74 in _M_run (this=0x611028) at [snip]/gcc-4.7.2/lib/gcc/x86_64-unknown-linux-gnu/4.7.2/../../../../include/c++/4.7.2/thread:115
#12 0x00007feb01feae10 in execute_native_thread_routine (__p=Unhandled dwarf expression opcode 0xf3
) at [snip]/gcc-4.7.2/libstdc++-v3/src/c++11/thread.cc:73
#13 0x00007feb018879ca in start_thread (arg=<value optimized out>) at pthread_create.c:300
#14 0x00007feb015e569d in clone () at ../sysdeps/unix/sysv/linux/x86_64/clone.S:112
#15 0x0000000000000000 in ?? ()
4b9b3361

Ответ 1

Кажется, проблема в том, что вы уничтожаете packaged_task, возможно, до того, как operator() вернется в рабочий поток. Это, скорее всего, поведение undefined. Программа отлично подходит для меня, если я повторно использую мьютекс в цикле, ожидая, когда будущее вернет результат. Это сериализует operator() и деструктор packaged_task.

Ответ 2

Я не могу объяснить, почему ваш код был сломан, но я нашел способ его исправить (сохранение задач, а не std::functions, построенных из задач):

#include <iostream>
#include <future>
#include <thread>
#include <vector>
#include <functional>
#include <unistd.h>

int main(int argc, char** argv)
{
    // Let face it - your lock() function was kinda weird.
    std::mutex mtx;

    // I've changed this to a vector of tasks, from a vector
    // of functions. Seems to have done the job. Not sure exactly
    // why but this seems to be the proper way to go.
    std::vector<std::packaged_task<int()>> messages;

    std::atomic<bool> running{true};

    std::thread thread([&]{
        while (running) {
            std::unique_lock<std::mutex> l{mtx};
            std::cout << "[T] locked with " << messages.size() << " messages." << std::endl;
            for (auto& fn: messages) {
                fn();
            }
            messages.clear();
        }
    });

    for (int i = 0; i < 1000000; ++i) {
        std::packaged_task<int()> task([i]{
            std::cout << "[T] returning " << i << std::endl;
            return i;
        });

        // Without grabbing this now, if the thread executed fn()
        // before I do f.get() below, it complained about having
        // no shared state.
        std::future<int> f = task.get_future();

        {
            std::unique_lock<std::mutex> l{mtx};
            messages.emplace_back(std::move(task));
        }

        f.get();
    }

    running = false;
    thread.join();
}

По крайней мере, если этот код также тупик, то он еще не для меня.