Подтвердить что ты не робот

Как мне обращаться с мьютексами в подвижных типах на С++?

По дизайну std::mutex не движется и не копируется. Это означает, что класс A, который содержит мьютекс, не получит конструктор по умолчанию-move.

Как сделать этот тип A подвижным поточно-безопасным способом?

4b9b3361

Ответ 1

Начнем с кода:

class A
{
    using MutexType = std::mutex;
    using ReadLock = std::unique_lock<MutexType>;
    using WriteLock = std::unique_lock<MutexType>;

    mutable MutexType mut_;

    std::string field1_;
    std::string field2_;

public:
    ...

Я добавил несколько довольно надуманных псевдонимов типов, которые мы не будем использовать в С++ 11, но стали намного полезнее в С++ 14. Будьте терпеливы, мы доберемся туда.

Ваш вопрос сводится к следующему:

Как написать конструктор перемещения и переместить оператор присваивания для этого класса?

Мы начнем с конструктора перемещения.

Переместить конструктор

Обратите внимание, что элемент mutex выполнен mutable. Строго говоря, это не обязательно для участников движения, но я предполагаю, что вы также хотите копировать участников. Если это не так, нет необходимости делать мьютекс mutable.

При построении A вам не нужно блокировать this->mut_. Но вам нужно заблокировать mut_ объекта, из которого вы строите (переместить или скопировать). Это можно сделать так:

    A(A&& a)
    {
        WriteLock rhs_lk(a.mut_);
        field1_ = std::move(a.field1_);
        field2_ = std::move(a.field2_);
    }

Обратите внимание, что сначала нам нужно было по умолчанию сконструировать элементы this, а затем назначить их значения только после того, как a.mut_ заблокирован.

Назначение перемещения

Оператор присваивания перемещения существенно сложнее, потому что вы не знаете, обращается ли какой-либо другой поток к lhs или rhs выражения присваивания. И вообще, вам нужно защититься от следующего сценария:

// Thread 1
x = std::move(y);

// Thread 2
y = std::move(x);

Вот оператор назначения перемещения, который правильно защищает описанный выше сценарий:

    A& operator=(A&& a)
    {
        if (this != &a)
        {
            WriteLock lhs_lk(mut_, std::defer_lock);
            WriteLock rhs_lk(a.mut_, std::defer_lock);
            std::lock(lhs_lk, rhs_lk);
            field1_ = std::move(a.field1_);
            field2_ = std::move(a.field2_);
        }
        return *this;
    }

Обратите внимание, что для блокировки двух мьютексов следует использовать std::lock(m1, m2), а не просто блокировать их один за другим. Если вы блокируете их один за другим, тогда, когда два потока назначают два объекта в противоположном порядке, как показано выше, вы можете получить тупик. Точка std::lock заключается в том, чтобы избежать этого тупика.

Копировать конструктор

Вы не спрашивали о членах копии, но теперь мы можем поговорить о них (если не вы, кому-то они понадобятся).

    A(const A& a)
    {
        ReadLock  rhs_lk(a.mut_);
        field1_ = a.field1_;
        field2_ = a.field2_;
    }

Конструктор копирования похож на конструктор перемещения, за исключением того, что вместо WriteLock используется псевдоним ReadLock. В настоящее время эти оба псевдонима std::unique_lock<std::mutex>, и поэтому на самом деле это не имеет никакого значения.

Но в С++ 14 у вас будет возможность сказать следующее:

    using MutexType = std::shared_timed_mutex;
    using ReadLock  = std::shared_lock<MutexType>;
    using WriteLock = std::unique_lock<MutexType>;

Это может быть оптимизация, но не определенно. Вы должны будете измерить, чтобы определить, есть ли это. Но с этим изменением можно копировать конструкцию из одного и того же числа в нескольких потоках одновременно. Решение С++ 11 заставляет вас делать такие потоки последовательными, даже если rhs не изменяется.

Назначение копирования

Для полноты, вот оператор присваивания копии, который должен быть достаточно понятным после прочтения всего остального:

    A& operator=(const A& a)
    {
        if (this != &a)
        {
            WriteLock lhs_lk(mut_, std::defer_lock);
            ReadLock  rhs_lk(a.mut_, std::defer_lock);
            std::lock(lhs_lk, rhs_lk);
            field1_ = a.field1_;
            field2_ = a.field2_;
        }
        return *this;
    }

И т.д.

Любые другие члены или свободные функции, которые получают доступ к состоянию A, также должны быть защищены, если вы ожидаете, что несколько потоков смогут сразу вызвать их. Например, здесь swap:

    friend void swap(A& x, A& y)
    {
        if (&x != &y)
        {
            WriteLock lhs_lk(x.mut_, std::defer_lock);
            WriteLock rhs_lk(y.mut_, std::defer_lock);
            std::lock(lhs_lk, rhs_lk);
            using std::swap;
            swap(x.field1_, y.field1_);
            swap(x.field2_, y.field2_);
        }
    }

Обратите внимание, что если вы просто зависите от std::swap выполнения задания, блокировка будет неточной детализации, блокировки и разблокировки между тремя ходами, которые std::swap будет выполнять внутренне.

В самом деле, мышление о swap может дать вам представление о API, который может потребоваться для обеспечения "потокобезопасности" A, который в целом будет отличаться от "небезобезопасного" API, из-за проблемы "блокировки детализации".

Также обратите внимание на необходимость защиты от "самообмена". "self-swap" должен быть не-op. Без самопроверки рекурсивно блокирует один и тот же мьютекс. Это также можно было бы решить без самопроверки, используя std::recursive_mutex для MutexType.

Обновление

В комментариях ниже Yakk довольно недоволен необходимостью по умолчанию строить вещи в конструкциях копирования и перемещения (и у него есть точка). Если вы достаточно решительно относитесь к этой проблеме, так что вы готовы потратить на нее память, вы можете избежать ее так:

  • Добавьте все типы блокировок, которые вам нужны в качестве элементов данных. Эти члены должны находиться перед защищаемыми данными:

    mutable MutexType mut_;
    ReadLock  read_lock_;
    WriteLock write_lock_;
    // ... other data members ...
    
  • И затем в конструкторах (например, в конструкторе копирования) выполните следующее:

    A(const A& a)
        : read_lock_(a.mut_)
        , field1_(a.field1_)
        , field2_(a.field2_)
    {
        read_lock_.unlock();
    }
    

Упс, Якк удалил свой комментарий, прежде чем у меня была возможность завершить это обновление. Но он заслуживает признания за то, что он подтолкнул эту проблему и принял решение в этом ответе.

Обновление 2

И dyp придумал это хорошее предложение:

    A(const A& a)
        : A(a, ReadLock(a.mut_))
    {}
private:
    A(const A& a, ReadLock rhs_lk)
        : field1_(a.field1_)
        , field2_(a.field2_)
    {}

Ответ 2

Учитывая, что это не похоже на хороший, чистый и простой способ ответить на этот вопрос - решение Антона я считаю правильным, но его решительно спорный, если не будет лучшего ответа, я бы рекомендовал поставить такой класс в кучу и глядя через него через std::unique_ptr:

auto a = std::make_unique<A>();

Теперь это полностью подвижный тип, и любой, у кого есть блокировка внутреннего мьютекса, пока происходит какое-то действие, по-прежнему безопасна, даже если его обсуждают, хорошо ли это делать

Если вам нужна семантика копирования, просто используйте

auto a2 = std::make_shared<A>();

Ответ 3

Это перевернутый ответ. Вместо того, чтобы внедрять "эти объекты нужно синхронизировать" в качестве базы типа, вместо этого вводить его под любой тип.

Вы имеете дело с синхронизированным объектом по-разному. Одна из серьезных проблем - вы должны беспокоиться о взаимоблокировках (блокировка нескольких объектов). Это также никогда не должно быть вашей "стандартной версией объекта": синхронизированные объекты предназначены для объектов, которые будут в конфликте, и ваша цель должна заключаться в том, чтобы свести к минимуму конфликт между потоками, а не подметать его под ковром.

Но синхронизирующие объекты по-прежнему полезны. Вместо наследования от синхронизатора мы можем написать класс, который обертывает произвольный тип при синхронизации. Пользователи должны перейти через несколько обручей, чтобы выполнять операции над объектом теперь, когда они синхронизированы, но они не ограничены каким-то ограниченным набором операций с ограниченным ручным управлением для объекта. Они могут составлять несколько операций над объектом в один или работать с несколькими объектами.

Вот синхронизированная обертка вокруг произвольного типа T:

template<class T>
struct synchronized {
  template<class F>
  auto read(F&& f) const&->std::result_of_t<F(T const&)> {
    return access(std::forward<F>(f), *this);
  }
  template<class F>
  auto read(F&& f) &&->std::result_of_t<F(T&&)> {
    return access(std::forward<F>(f), std::move(*this));
  }
  template<class F>
  auto write(F&& f)->std::result_of_t<F(T&)> {
    return access(std::forward<F>(f), *this);
  }
  // uses `const` ness of Syncs to determine access:
  template<class F, class... Syncs>
  friend auto access( F&& f, Syncs&&... syncs )->
  std::result_of_t< F(decltype(std::forward<Syncs>(syncs).t)...) >
  {
    return access2( std::index_sequence_for<Syncs...>{}, std::forward<F>(f), std::forward<Syncs>(syncs)... );
  };
  synchronized(synchronized const& o):t(o.read([](T const&o){return o;})){}
  synchronized(synchronized && o):t(std::move(o).read([](T&&o){return std::move(o);})){}  
  // special member functions:
  synchronized( T & o ):t(o) {}
  synchronized( T const& o ):t(o) {}
  synchronized( T && o ):t(std::move(o)) {}
  synchronized( T const&& o ):t(std::move(o)) {}
  synchronized& operator=(T const& o) {
    write([&](T& t){
      t=o;
    });
    return *this;
  }
  synchronized& operator=(T && o) {
    write([&](T& t){
      t=std::move(o);
    });
    return *this;
  }
private:
  template<class X, class S>
  static auto smart_lock(S const& s) {
    return std::shared_lock< std::shared_timed_mutex >(s.m, X{});
  }
  template<class X, class S>
  static auto smart_lock(S& s) {
    return std::unique_lock< std::shared_timed_mutex >(s.m, X{});
  }
  template<class L>
  static void lock(L& lockable) {
      lockable.lock();
  }
  template<class...Ls>
  static void lock(Ls&... lockable) {
      std::lock( lockable... );
  }
  template<size_t...Is, class F, class...Syncs>
  friend auto access2( std::index_sequence<Is...>, F&&f, Syncs&&...syncs)->
  std::result_of_t< F(decltype(std::forward<Syncs>(syncs).t)...) >
  {
    auto locks = std::make_tuple( smart_lock<std::defer_lock_t>(syncs)... );
    lock( std::get<Is>(locks)... );
    return std::forward<F>(f)(std::forward<Syncs>(syncs).t ...);
  }

  mutable std::shared_timed_mutex m;
  T t;
};
template<class T>
synchronized< T > sync( T&& t ) {
  return {std::forward<T>(t)};
}
Включены функции

С++ 14 и С++ 1z.

это предполагает, что операции const являются безопасными с несколькими считывателями (что и предполагает контейнер std).

Использование выглядит так:

synchronized<int> x = 7;
x.read([&](auto&& v){
  std::cout << v << '\n';
});

для int с синхронизированным доступом.

Я бы посоветовал не иметь synchronized(synchronized const&). Это редко необходимо.

Если вам понадобится synchronized(synchronized const&), у меня возникнет соблазн заменить T t; на std::aligned_storage, позволяя построить ручное размещение и ручное уничтожение. Это позволяет правильно управлять жизненным циклом.

За исключением этого, мы могли бы скопировать источник T, а затем прочитать его:

synchronized(synchronized const& o):
  t(o.read(
    [](T const&o){return o;})
  )
{}
synchronized(synchronized && o):
  t(std::move(o).read(
    [](T&&o){return std::move(o);})
  )
{}

для назначения:

synchronized& operator=(synchronized const& o) {
  access([](T& lhs, T const& rhs){
    lhs = rhs;
  }, *this, o);
  return *this;
}
synchronized& operator=(synchronized && o) {
  access([](T& lhs, T&& rhs){
    lhs = std::move(rhs);
  }, *this, std::move(o));
  return *this;
}
friend void swap(synchronized& lhs, synchronized& rhs) {
  access([](T& lhs, T& rhs){
    using std::swap;
    swap(lhs, rhs);
  }, *this, o);
}

размещение и выровненные версии хранилищ немного беспорядочны. Большая часть доступа к T будет заменена функцией-членом T&t() и T const&t()const, кроме как при построении, где вам придется перепрыгивать через некоторые обручи.

Сделав synchronized оболочку вместо части класса, все, что мы должны убедиться, состоит в том, что класс внутренне уважает const как многозадачность и записывает его однопоточным образом.

В случаях редких нам нужен синхронизированный экземпляр, мы переходим через обручи, как указано выше.

Извините за любые опечатки в приведенном выше. Вероятно, есть некоторые.

Боковое преимущество вышеизложенного заключается в том, что n-арные произвольные операции над объектами synchronized (одного и того же типа) работают вместе, без необходимости жестко кодировать его перед началом работы. Добавить в объявление друга и n-ary synchronized объекты нескольких типов могут работать вместе. Мне может потребоваться переместить access из встроенного друга, чтобы иметь дело с перегрузками в этом случае.

живой пример

Ответ 4

Использование мьютексов и семантики перемещения С++ - отличный способ безопасно и эффективно передавать данные между потоками.

Представьте себе поток "производителя", который производит партии строк и предоставляет их (одному или нескольким) потребителям. Эти партии могут быть представлены объектом, содержащим (потенциально большие) объекты std::vector<std::string>. Мы абсолютно хотим "переместить" внутреннее состояние этих векторов в своих потребителей без ненужного дублирования.

Вы просто распознаете мьютекс как часть объекта, не являющегося частью состояния объекта. То есть вы не хотите перемещать мьютексы.

Необходимая блокировка зависит от вашего алгоритма или от того, насколько обобщены ваши объекты и какой диапазон использования вы разрешаете.

Если вы только переходите от объекта "производитель" к общему объекту "потребляющего" потока, вы можете быть в порядке, чтобы заблокировать только перемещенный объект из.

Если это более общий дизайн, вам нужно будет заблокировать оба. В таком случае вам нужно подумать о блокировке блокировки.

Если это потенциальная проблема, используйте std::lock() для получения блокировок на обоих мьютексах без блокировки.

http://en.cppreference.com/w/cpp/thread/lock

В качестве последней заметки вам нужно убедиться, что вы понимаете семантику перемещения. Напомним, что перемещенный из объекта остается в действительном, но неизвестном состоянии. Вполне возможно, что поток, не выполняющий перемещение, имеет вескую причину для попытки доступа к перемещенному объекту, когда он может найти это действительное, но неизвестное состояние.

Опять мой продюсер просто избивает строки, и потребитель убирает всю нагрузку. В этом случае каждый раз, когда производитель пытается добавить к вектору, он может найти вектор непустым или пустым.

Короче, если потенциальный параллельный доступ к перемещенному из объекта сумме к записи, вероятно, будет ОК. Если это составляет прочитанное, тогда подумайте, почему это нормально читать произвольное состояние.

Ответ 5

Прежде всего, в вашем дизайне должно быть что-то не так, если вы хотите переместить объект, содержащий мьютекс.

Но если вы все равно решите это сделать, вам нужно создать новый мьютекс в конструкторе перемещения, например:

// movable
struct B{};

class A {
    B b;
    std::mutex m;
public:
    A(A&& a)
        : b(std::move(a.b))
        // m is default-initialized.
    {
    }
};

Это потокобезопасно, потому что конструктор перемещения может смело предположить, что его аргумент не используется нигде, поэтому блокировка аргумента не требуется.