Подтвердить что ты не робот

Golang: Лучший способ реализовать глобальные счетчики для высококонкурентных приложений?

Каков наилучший способ реализации глобальных счетчиков для высококонкурентного приложения? В моем случае у меня может быть 10K-20K выходить, выполняя "работу", и я хочу подсчитать количество и типы элементов, которые подпрограммы работают совместно...

"Классический" синхронный стиль кодирования будет выглядеть так:

var work_counter int

func GoWorkerRoutine() {
    for {
        // do work
        atomic.AddInt32(&work_counter,1)
    }    
}

Теперь это усложняется, потому что я хочу отслеживать "тип" выполняемой работы, поэтому мне действительно нужно что-то вроде этого:

var work_counter map[string]int
var work_mux sync.Mutex

func GoWorkerRoutine() {
    for {
        // do work
        work_mux.Lock()
        work_counter["type1"]++
        work_mux.Unlock()
    }    
}

Кажется, что должен быть оптимизированный путь "go" с использованием каналов или чего-то подобного:

var work_counter int
var work_chan chan int // make() called somewhere else (buffered)

// started somewher else
func GoCounterRoutine() {
    for {
        select {
            case c := <- work_chan:
                work_counter += c
                break
        }
    }
}

func GoWorkerRoutine() {
    for {
        // do work
        work_chan <- 1
    }    
}

В этом последнем примере по-прежнему отсутствует карта, но этого достаточно легко добавить. Будет ли этот стиль обеспечивать лучшую производительность, чем просто простой атомный приращение? Я не могу сказать, является ли это более или менее сложным, когда мы говорим о параллельном доступе к глобальному значению по сравнению с чем-то, что может блокировать выполнение ввода-вывода...

Мысли оценены.

Обновление 5/28/2013:

Я протестировал пару реализаций, и результаты были не такими, как я ожидал, здесь мой исходный код счетчика:

package helpers

import (
)

type CounterIncrementStruct struct {
    bucket string
    value int
}

type CounterQueryStruct struct {
    bucket string
    channel chan int
}

var counter map[string]int
var counterIncrementChan chan CounterIncrementStruct
var counterQueryChan chan CounterQueryStruct
var counterListChan chan chan map[string]int

func CounterInitialize() {
    counter = make(map[string]int)
    counterIncrementChan = make(chan CounterIncrementStruct,0)
    counterQueryChan = make(chan CounterQueryStruct,100)
    counterListChan = make(chan chan map[string]int,100)
    go goCounterWriter()
}

func goCounterWriter() {
    for {
        select {
            case ci := <- counterIncrementChan:
                if len(ci.bucket)==0 { return }
                counter[ci.bucket]+=ci.value
                break
            case cq := <- counterQueryChan:
                val,found:=counter[cq.bucket]
                if found {
                    cq.channel <- val
                } else {
                    cq.channel <- -1    
                }
                break
            case cl := <- counterListChan:
                nm := make(map[string]int)
                for k, v := range counter {
                    nm[k] = v
                }
                cl <- nm
                break
        }
    }
}

func CounterIncrement(bucket string, counter int) {
    if len(bucket)==0 || counter==0 { return }
    counterIncrementChan <- CounterIncrementStruct{bucket,counter}
}

func CounterQuery(bucket string) int {
    if len(bucket)==0 { return -1 }
    reply := make(chan int)
    counterQueryChan <- CounterQueryStruct{bucket,reply}
    return <- reply
}

func CounterList() map[string]int {
    reply := make(chan map[string]int)
    counterListChan <- reply
    return <- reply
}

Он использует каналы для записи и чтения, которые кажутся логичными.

Вот мои тестовые примеры:

func bcRoutine(b *testing.B,e chan bool) {
    for i := 0; i < b.N; i++ {
        CounterIncrement("abc123",5)
        CounterIncrement("def456",5)
        CounterIncrement("ghi789",5)
        CounterIncrement("abc123",5)
        CounterIncrement("def456",5)
        CounterIncrement("ghi789",5)
    }
    e<-true
}

func BenchmarkChannels(b *testing.B) {
    b.StopTimer()
    CounterInitialize()
    e:=make(chan bool)
    b.StartTimer()

    go bcRoutine(b,e)
    go bcRoutine(b,e)
    go bcRoutine(b,e)
    go bcRoutine(b,e)
    go bcRoutine(b,e)

    <-e
    <-e
    <-e
    <-e
    <-e

}

var mux sync.Mutex
var m map[string]int
func bmIncrement(bucket string, value int) {
    mux.Lock()
    m[bucket]+=value
    mux.Unlock()
}

func bmRoutine(b *testing.B,e chan bool) {
    for i := 0; i < b.N; i++ {
        bmIncrement("abc123",5)
        bmIncrement("def456",5)
        bmIncrement("ghi789",5)
        bmIncrement("abc123",5)
        bmIncrement("def456",5)
        bmIncrement("ghi789",5)
    }
    e<-true
}

func BenchmarkMutex(b *testing.B) {
    b.StopTimer()
    m=make(map[string]int)
    e:=make(chan bool)
    b.StartTimer()

    for i := 0; i < b.N; i++ {
        bmIncrement("abc123",5)
        bmIncrement("def456",5)
        bmIncrement("ghi789",5)
        bmIncrement("abc123",5)
        bmIncrement("def456",5)
        bmIncrement("ghi789",5)
    }

    go bmRoutine(b,e)
    go bmRoutine(b,e)
    go bmRoutine(b,e)
    go bmRoutine(b,e)
    go bmRoutine(b,e)

    <-e
    <-e
    <-e
    <-e
    <-e

}

Я реализовал простой бенчмарк с помощью всего лишь мьютекса вокруг карты (только для тестирования) и сравнивал с 5 горутами, которые работают параллельно. Вот результаты:

$ go test --bench=. helpers
PASS
BenchmarkChannels         100000             15560 ns/op
BenchmarkMutex   1000000              2669 ns/op
ok      helpers 4.452s

Я бы не ожидал, что мьютексы будут намного быстрее...

Другие мысли?

4b9b3361

Ответ 1

Не используйте sync/atomic - со страницы с закладками

Пакет Atom обеспечивает примитивы атомной памяти на низком уровне, полезные для реализация алгоритмов синхронизации. Эти функции требуют правильной работы. За исключением специальные, низкоуровневые приложения, синхронизация лучше выполняется с помощью каналов или средств пакета синхронизации.

В прошлый раз, когда я должен был это сделать, я сравнил то, что было похоже на ваш второй пример с мьютексом и что-то похожее на ваш третий пример с канал. Коды каналов выиграли, когда дела действительно заняты, но убедитесь, что вы делаете буфер канала большим.

Ответ 2

Если вы пытаетесь синхронизировать пул работников (например, разрешить n goroutines хруститься при некотором количестве работы), тогда каналы - очень хороший способ обойти это, но если все, что вам действительно нужно, - это счетчик ( например, просмотры страниц), то они переполнены. sync и sync/atomic пакеты помогут.

import "sync/atomic"

type count32 int32

func (c *count32) increment() int32 {
    return atomic.AddInt32((*int32)(c), 1)
}

func (c *count32) get() int32 {
    return atomic.LoadInt32((*int32)(c))
}

Перейти к Пример игровой площадки

Ответ 3

Не бойтесь использовать мьютексы и блокировки только потому, что считаете, что они "не правильно идут". В вашем втором примере совершенно ясно, что происходит, и это очень важно. Вам придется самому попробовать, чтобы узнать, насколько доволен этим мьютексом, и добавит ли добавление усложнение производительность.

Если вам нужна повышенная производительность, возможно, оштрафование - лучший способ: http://play.golang.org/p/uLirjskGeN

Недостатком является то, что ваши подсчеты будут только такими актуальными, как решает ваш осколок. Также могут быть хиты производительности при вызове time.Since() столько, но, как всегда, сначала измерьте:)

Ответ 4

Последний был близок:

package main

import "fmt"

func main() {
    ch := make(chan int, 3)
    go GoCounterRoutine(ch)
    go GoWorkerRoutine(1, ch)
    // not run as goroutine because mein() would just end
    GoWorkerRoutine(2, ch)

}

// started somewhere else
func GoCounterRoutine(ch chan int) {
    counter := 0
    for {
        ch <- counter
        counter += 1
    }
}

func GoWorkerRoutine(n int, ch chan int) {
    var seq int
    for seq := range ch {
        // do work:
        fmt.Println(n, seq)
    }
}

Это вводит одну точку отказа: если счетчик горутин умирает, все теряется. Это может быть не проблема, если все goroutine выполняются на одном компьютере, но могут стать проблемой, если они разбросаны по сети. Чтобы сделать счетчик иммунным к отказам одиночных узлов в кластере, специальные алгоритмы должны использоваться.

Ответ 5

Другой ответ, использующий sync/atomic, подходит для таких вещей, как счетчики страниц, но не для отправки уникальных идентификаторов внешнему API. Для этого вам понадобится операция "increment-and-return", которая может быть реализована только как цикл CAS.

Здесь цикл CAS вокруг int32 для генерации уникальных идентификаторов сообщений:

import "sync/atomic"

type UniqueID struct {
    counter int32
}

func (c *UniqueID) Get() int32 {
    for {
        val := atomic.LoadInt32(&c.counter)
        if atomic.CompareAndSwapInt32(&c.counter, val, val+1) {
            return val
        }
    }
}

Чтобы использовать его, просто выполните:

requestID := client.msgID.Get()
form.Set("id", requestID)

Это имеет преимущество перед каналами в том, что для него не требуется столько лишних ресурсов - существующие goroutines используются, поскольку они запрашивают идентификаторы, а не используют один goroutine для каждого счетчика, который требуется вашей программе.

TODO: контрольный показатель против каналов. Я собираюсь угадать, что каналы хуже в случае отсутствия конкуренции и лучше в случае с высоким уровнем конкуренции, поскольку они имеют очередь, в то время как этот код просто вращается, пытаясь выиграть гонку.

Ответ 6

Я реализовал это с помощью простой map + mutex, которая, как представляется, является лучшим способом справиться с этим, поскольку это "самый простой способ" (это то, что Go говорит, чтобы использовать для выбора блокировок и каналов).

package main

import (
    "fmt"
    "sync"
)

type single struct {
    mu     sync.Mutex
    values map[string]int64
}

var counters = single{
    values: make(map[string]int64),
}

func (s *single) Get(key string) int64 {
    s.mu.Lock()
    defer s.mu.Unlock()
    return s.values[key]
}

func (s *single) Incr(key string) int64 {
    s.mu.Lock()
    defer s.mu.Unlock()
    s.values[key]++
    return s.values[key]
}

func main() {
    fmt.Println(counters.Incr("bar"))
    fmt.Println(counters.Incr("bar"))
    fmt.Println(counters.Incr("bar"))
    fmt.Println(counters.Get("foo"))
    fmt.Println(counters.Get("bar"))

}

Вы можете запустить код https://play.golang.org/p/9bDMDLFBAY. Я сделал простую упакованную версию на gist.github.com