Подтвердить что ты не робот

Как использовать RWMutex в Голанге?

type Stat struct {
    counters     map[string]*int64
    countersLock sync.RWMutex
    averages     map[string]*int64
    averagesLock sync.RWMutex
}

Он называется ниже

func (s *Stat) Count(name string) {
    s.countersLock.RLock()
    counter := s.counters[name]
    s.countersLock.RUnlock()
    if counter != nil {
        atomic.AddInt64(counter, int64(1))
        return
    }
}

Я понимаю, что мы сначала блокируем приемник s (который является типом Stat), а затем добавляем к нему, если счетчик существует.

Вопросы:

Q1: зачем нам блокировать его? Что означает RWMutex?

Q2: s.countersLock.RLock() - блокирует ли он весь приемник s или только поле счетчиков в типе Stat?

Q3: s.countersLock.RLock() - блокирует ли это поле средних?

Q4: Почему мы должны использовать RWMutex? Я думал, что канал был предпочтительным способом обработки concurrency в Голанге?

Q5: Что это за atomic.AddInt64. Почему в этом случае нам нужен атомный?

Q6: Почему мы должны разблокировать право до того, как мы добавим к нему?

4b9b3361

Ответ 1

Вопросы:

Q1: зачем нам блокировать его? Что означает RWMutex?

RW означает чтение/запись. CF doc: http://golang.org/pkg/sync/#RWMutex.

Нам нужно заблокировать его, чтобы другие подпрограммы/нитки не изменяли значение при его обработке.

Q2: s.countersLock.RLock() - блокирует ли он весь приемник или только поле счетчиков в типе Stat?

Как мьютекс, блокировка происходит только тогда, когда вы вызываете функцию RLock(). Если какой-либо другой горутин уже назвал WLock(), он блокирует. Вы можете вызвать любое число RLock() в пределах одного и того же goroutine, оно не будет заблокировано.

Поэтому он не блокирует другие поля, даже s.counters. В вашем примере вы заблокируете поиск карты, чтобы найти правильный счетчик.

Q3: s.countersLock.RLock() - блокирует ли это поле средних?

Нет, как сказано в Q2, a RLock блокирует только себя.

Q4: Почему мы должны использовать RWMutex? Я думал, что канал является предпочтительным способом обрабатывать concurrency в Голанге?

Канал очень полезен, но иногда этого недостаточно, и иногда это не имеет смысла.

Здесь, когда вы блокируете доступ к карте, мьютекс имеет смысл. С chan, вы должны иметь буферизованный chan из 1, отправлять до и получать после. Не очень интуитивно.

Q5: Что это за atomic.AddInt64. Почему в этом случае нам нужен атом?

Эта функция будет увеличивать заданную переменную атомарным способом. В вашем случае у вас есть условие гонки: counter - это указатель, и фактическая переменная может быть уничтожена после освобождения блокировки и перед вызовом atomic.AddInt64. Если вы не знакомы с такими вещами, я бы посоветовал вам придерживаться Mutexes и выполнять всю необходимую обработку между блокировкой/разблокировкой.

Q6: Почему мы должны разблокировать право до того, как мы добавим к нему? Вы не должны.

Я не знаю, что вы пытаетесь сделать, но вот пример (простой): https://play.golang.org/p/cVFPB-05dw

Ответ 2

Когда более одного потока * необходимо изменить одно и то же значение, необходим механизм блокировки для синхронизации доступа. Без него два или более потока * могут одновременно записываться в одно и то же значение, что приводит к поврежденной памяти, которая обычно приводит к сбою.

Пакет atomic обеспечивает быстрый и простой способ синхронизации доступа к примитивным значениям. Для счетчика это самый быстрый метод синхронизации. Он имеет методы с четко определенными вариантами использования, такими как приращение, уменьшение, свопинг и т.д.

Пакет sync обеспечивает способ синхронизации доступа к более сложным значениям, таким как карты, срезы, массивы или группы значений, Вы используете это для случаев использования, которые не определены в atomic.

В любом случае блокировка требуется только при записи. Несколько потоков * могут безопасно считывать одно и то же значение без механизма блокировки.

Давайте посмотрим на предоставленный вами код.

type Stat struct {
    counters     map[string]*int64
    countersLock sync.RWMutex
    averages     map[string]*int64
    averagesLock sync.RWMutex
}

func (s *Stat) Count(name string) {
    s.countersLock.RLock()
    counter := s.counters[name]
    s.countersLock.RUnlock()
    if counter != nil {
        atomic.AddInt64(counter, int64(1))
        return
    }
}

Здесь отсутствует то, как инициализируются сами карты. И пока карты не мутируются. Если имена счетчиков предопределены и не могут быть добавлены позднее, вам не понадобится RWMutex. Этот код может выглядеть примерно так:

type Stat struct {
    counters map[string]*int64
}

func InitStat(names... string) Stat {
    counters := make(map[string]*int64)
    for _, name := range names {
        counter := int64(0)
        counters[name] = &counter
    }
    return Stat{counters}
}

func (s *Stat) Count(name string) int64 {
    counter := s.counters[name]
    if counter == nil {
        return -1 // (int64, error) instead?
    }
    return atomic.AddInt64(counter, 1)
}

(Примечание: я удалил средние значения, потому что он не использовался в исходном примере.)

Теперь, скажем, вы не хотели, чтобы ваши счетчики были предопределены. В этом случае вам понадобится мьютекс для синхронизации доступа.

Давайте попробуем его с помощью Mutex. Это просто, потому что только один поток * может удерживать Lock за раз. Если второй поток * пытается Lock перед первым выпуском их с Unlock, он ждет (или блокирует) ** до тех пор.

type Stat struct {
    counters map[string]*int64
    mutex    sync.Mutex
}

func InitStat() Stat {
    return Stat{counters: make(map[string]*int64)}
}

func (s *Stat) Count(name string) int64 {
    s.mutex.Lock()
    counter := s.counters[name]
    if counter == nil {
        value := int64(0)
        counter = &value
        s.counters[name] = counter
    }
    s.mutex.Unlock()
    return atomic.AddInt64(counter, 1)
}

Приведенный выше код будет работать отлично. Но есть две проблемы.

  • Если между Lock() и Unlock() есть паника, мьютекс будет заблокирован навсегда, даже если вы должны восстановиться после паники. Этот код, вероятно, не будет паниковать, но в целом лучше использовать его, чтобы он мог.
  • При извлечении счетчика выполняется исключительная блокировка. Только один поток * может считывать со счетчика за один раз.

Проблема №1 легко решить. Используйте defer:

func (s *Stat) Count(name string) int64 {
    s.mutex.Lock()
    defer s.mutex.Unlock()
    counter := s.counters[name]
    if counter == nil {
        value := int64(0)
        counter = &value
        s.counters[name] = counter
    }
    return atomic.AddInt64(counter, 1)
}

Это гарантирует, что Unlock() всегда вызывается. И если по какой-то причине у вас есть больше одного возврата, вам нужно только указать Unlock() один раз во главе функции.

Проблема №2 может быть решена с помощью RWMutex. Как это работает, и почему оно полезно?

RWMutex является расширением Mutex и добавляет два метода: RLock и RUnlock. Есть несколько моментов, которые важно отметить RWMutex:

  • RLock является общей блокировкой чтения. Когда с ним фиксируется замок, другие потоки * также могут использовать свой собственный замок с RLock. Это означает, что несколько потоков * могут считываться одновременно. Это полуисключительно.

  • Если мьютекс заблокирован, вызов Lock заблокирован **. Если один или несколько читателей имеют блокировку, вы не можете писать.

  • Если мьютекс заблокирован для записи (с Lock), RLock будет блокировать **.

Хороший способ подумать об этом RWMutex - это Mutex с помощью считывателя. RLock увеличивает счетчик, а RUnlock уменьшает его, Вызов Lock будет заблокирован, пока этот счетчик будет > 0.

Возможно, вы думаете: если мое приложение будет читать тяжело, это означает, что писатель может быть заблокирован на неопределенный срок? Нет. Существует еще одно полезное свойство RWMutex:

  • Если счетчик чтения > 0 и Lock вызывается, будущие вызовы RLock также будет блокироваться до тех пор, пока существующие читатели не выпустят свои блокировки, автор получил свою блокировку и позже выпустил ее.

Подумайте об этом, так как свет над регистром в продуктовом магазине, где говорится, что кассир открыт или нет. Люди в очереди могут остаться там, и им будет помогать, но новые люди не могут войти в строй. Как только последний оставшийся клиент получает помощь, кассир переходит на перерыв, и этот регистр либо остается закрытым, пока они не вернутся, либо заменены другим кассиром.

Позволяет изменить предыдущий пример RWMutex:

type Stat struct {
    counters map[string]*int64
    mutex    sync.RWMutex
}

func InitStat() Stat {
    return Stat{counters: make(map[string]*int64)}
}

func (s *Stat) Count(name string) int64 {
    var counter *int64
    if counter = getCounter(name); counter == nil {
        counter = initCounter(name);
    }
    return atomic.AddInt64(counter, 1)
}

func (s *Stat) getCounter(name string) *int64 {
    s.mutex.RLock()
    defer s.mutex.RUnlock()
    return s.counters[name]
}

func (s *Stat) initCounter(name string) *int64 {
    s.mutex.Lock()
    defer s.mutex.Unlock()
    counter := s.counters[name]
    if counter == nil {
        value := int64(0)
        counter = &value
        s.counters[name] = counter    
    }
    return counter
}

С помощью приведенного выше кода я разделил логику на функции getCounter и initCounter следующим образом:

  • Держите код понятным. Было бы сложно RLock() и Lock() в той же функции.
  • Освободите блокировки как можно раньше, используя отложенный.

В приведенном выше коде, в отличие от примера Mutex, вы можете одновременно увеличивать разные счетчики.

Еще одна вещь, которую я хотел отметить, - это все приведенные выше примеры, карта map[string]*int64 содержит указатели на счетчики, а не сами счетчики. Если вы должны были хранить счетчики на карте map[string]int64, вам нужно будет использовать Mutex без atomic. Этот код будет выглядеть примерно так:

type Stat struct {
    counters map[string]int64
    mutex    sync.Mutex
}

func InitStat() Stat {
    return Stat{counters: make(map[string]int64)}
}

func (s *Stat) Count(name string) int64 {
    s.mutex.Lock()
    defer s.mutex.Unlock()
    s.counters[name]++
    return s.counters[name]
}

Возможно, вы захотите сделать это, чтобы уменьшить сбор мусора - но это будет иметь значение только в том случае, если у вас есть тысячи счетчиков, - и даже тогда сами счетчики не занимают много места (по сравнению с чем-то вроде байтового буфера).

* Когда я говорю "поток", я имею в виду "рутину". Нить на других языках - это механизм одновременного запуска одного или нескольких наборов кода. Нить дорогая для создания и срывания. Ходовая процедура построена поверх потоков, но повторно использует их. Когда рутинная спячка спит, основной поток может использоваться другой подпрограммой. Когда рутина просыпается, она может быть в другом потоке. Go обрабатывает все это за кулисами. - Но для всех целей и задач вы будете рассматривать рутину как поток, когда дело доходит до доступа к памяти. Тем не менее, вам не обязательно быть таким же консервативным при использовании go-подпрограмм, как и потоки.

** Когда ходовая процедура блокируется Lock, RLock, каналом или Sleep, базовая нить может быть повторно использована. Никакой процессор не используется этой рутиной - думайте об этом как о ожидании в очереди. Как и другие языки, бесконечный цикл, такой как for {}, блокируется, сохраняя процессор и занятый занятый, подумайте об этом, когда бегаете по кругу - вы получите головокружение, бросьте, а окружающих вас людей не будет очень счастлив.