Подтвердить что ты не робот

Несколько горутов, прослушивающих один канал

У меня есть несколько goroutines, пытающихся получить на одном и том же канале одновременно. Похоже, что последний горутин, который начинает получать по каналу, получает значение. Это где-то в спецификации языка или это поведение undefined?

c := make(chan string)
for i := 0; i < 5; i++ {
    go func(i int) {
        <-c
        c <- fmt.Sprintf("goroutine %d", i)
    }(i)
}
c <- "hi"
fmt.Println(<-c)

Вывод:

goroutine 4

Пример на игровой площадке

EDIT:

Я только понял, что это сложнее, чем я думал. Сообщение передается по всем goroutines.

c := make(chan string)
for i := 0; i < 5; i++ {
    go func(i int) {
        msg := <-c
        c <- fmt.Sprintf("%s, hi from %d", msg, i)
    }(i)
}
c <- "original"
fmt.Println(<-c)

Вывод:

original, hi from 0, hi from 1, hi from 2, hi from 3, hi from 4

Пример на игровой площадке

4b9b3361

Ответ 1

Да, это сложно, Но есть несколько эмпирических правил, которые должны сделать вещи более понятными.

  • предпочитают использовать формальные аргументы для каналов, которые вы передаете go-подпрограмм вместо доступа к каналам в глобальной области. Вы можете получить больше компилятора, проверяя этот способ, а также лучшую модульность.
  • избегать чтения и записи на одном канале в определенной рутине (включая "главный" ). В противном случае взаимоблокировка представляет собой гораздо больший риск.

Вот альтернативная версия вашей программы, применяющая эти два руководства. Этот случай демонстрирует много писателей и одного читателя по каналу:

c := make(chan string)

for i := 1; i <= 5; i++ {
    go func(i int, co chan<- string) {
        for j := 1; j <= 5; j++ {
            co <- fmt.Sprintf("hi from %d.%d", i, j)
        }
    }(i, c)
}

for i := 1; i <= 25; i++ {
    fmt.Println(<-c)
}

http://play.golang.org/p/quQn7xePLw

Он создает пять подпрограмм, записывающих один канал, каждый из которых записывается пять раз. Основная рутина читает все двадцать пять сообщений - вы можете заметить, что порядок, в котором они появляются, часто не является последовательным (т.е. concurrency).

В этом примере демонстрируется функция каналов Go: возможно, у нескольких авторов есть один канал; Go будет чередовать сообщения автоматически.

То же самое относится к одному автору и нескольким считывателям на одном канале, как показано во втором примере:

c := make(chan int)
var w sync.WaitGroup
w.Add(5)

for i := 1; i <= 5; i++ {
    go func(i int, ci <-chan int) {
        j := 1
        for v := range ci {
            time.Sleep(time.Millisecond)
            fmt.Printf("%d.%d got %d\n", i, j, v)
            j += 1
        }
        w.Done()
    }(i, c)
}

for i := 1; i <= 25; i++ {
    c <- i
}
close(c)
w.Wait()

Этот второй пример включает в себя ожидание, наложенное на главный горутин, который в противном случае выходил бы быстро и заставлял бы остальные пять goroutines быть завершенными раньше ( благодаря olov для этой коррекции).

В обоих примерах буферизация не требовалась. Как правило, хорошим принципом является просмотр буферизации только в качестве усилителя производительности. Если ваша программа не заходит в тупик без буферов, она также не будет блокирована буферами (но обратное не всегда верно). Итак, как другое эмпирическое правило, начните без буферизации, а затем добавьте его по мере необходимости.

Ответ 2

Поздний ответ, но я надеюсь, что это поможет другим в будущем, например Long Polling, "Global" Кнопка, трансляция для всех?

Эффективный шаг объясняет проблему:

Приемники всегда блокируются до тех пор, пока не будут получены данные.

Это означает, что у вас не может быть более 1 горутин, прослушивающих 1 канал, и ожидайте, что ВСЕ ГОРОТИНЫ получат одинаковое значение.

Запустите этот Пример кода.

package main

import "fmt"

func main() {
    c := make(chan int)

    for i := 1; i <= 5; i++ {
        go func(i int) {
        for v := range c {
                fmt.Printf("count %d from goroutine #%d\n", v, i)
            }
        }(i)
    }

    for i := 1; i <= 25; i++ {
        c<-i
    }

    close(c)
}

Вы не увидите "count 1" более одного раза, хотя есть 5 goroutines, которые прослушивают канал. Это связано с тем, что, когда первый горутин блокирует канал, все остальные горуты должны ждать в очереди. Когда канал разблокирован, счет уже принят и удаляется из канала, поэтому следующий goroutine в строке получает следующее значение счета.

Ответ 3

Это сложно.

Также посмотрите, что происходит с GOMAXPROCS = NumCPU+1. Например,

package main

import (
    "fmt"
    "runtime"
)

func main() {
    runtime.GOMAXPROCS(runtime.NumCPU() + 1)
    fmt.Print(runtime.GOMAXPROCS(0))
    c := make(chan string)
    for i := 0; i < 5; i++ {
        go func(i int) {
            msg := <-c
            c <- fmt.Sprintf("%s, hi from %d", msg, i)
        }(i)
    }
    c <- ", original"
    fmt.Println(<-c)
}

Вывод:

5, original, hi from 0, hi from 4

И посмотрите, что происходит с буферизованными каналами. Например,

package main

import "fmt"

func main() {
    c := make(chan string, 5+1)
    for i := 0; i < 5; i++ {
        go func(i int) {
            msg := <-c
            c <- fmt.Sprintf("%s, hi from %d", msg, i)
        }(i)
    }
    c <- "original"
    fmt.Println(<-c)
}

Вывод:

original

Вы также сможете объяснить эти случаи.

Ответ 4

Я изучил существующие решения и создал простую широковещательную библиотеку https://github.com/grafov/bcast.

    group := bcast.NewGroup() // you created the broadcast group
    go bcast.Broadcasting(0) // the group accepts messages and broadcast it to all members

    member := group.Join() // then you join member(s) from other goroutine(s)
    member.Send("test message") // or send messages of any type to the group 

    member1 := group.Join() // then you join member(s) from other goroutine(s)
    val := member1.Recv() // and for example listen for messages

Ответ 5

Для нескольких горутин слушайте по одному каналу, да, это возможно. ключевым моментом является само сообщение, вы можете определить такое сообщение следующим образом:

package main

import (
    "fmt"
    "sync"
)

type obj struct {
    msg string
    receiver int
}

func main() {
    ch := make(chan *obj) // both block or non-block are ok
    var wg sync.WaitGroup
    receiver := 25 // specify receiver count

    sender := func() {
        o := &obj {
            msg: "hello everyone!",
            receiver: receiver,
        }
        ch <- o
    }
    recv := func(idx int) {
        defer wg.Done()
        o := <-ch
        fmt.Printf("%d received at %d\n", idx, o.receiver)
        o.receiver--
        if o.receiver > 0 {
            ch <- o // forward to others
        } else {
            fmt.Printf("last receiver: %d\n", idx)
        }
    }

    go sender()
    for i:=0; i<reciever; i++ {
        wg.Add(1)
        go recv(i)
    }

    wg.Wait()
}

Выход случайный:

5 received at 25
24 received at 24
6 received at 23
7 received at 22
8 received at 21
9 received at 20
10 received at 19
11 received at 18
12 received at 17
13 received at 16
14 received at 15
15 received at 14
16 received at 13
17 received at 12
18 received at 11
19 received at 10
20 received at 9
21 received at 8
22 received at 7
23 received at 6
2 received at 5
0 received at 4
1 received at 3
3 received at 2
4 received at 1
last receiver 4