Подтвердить что ты не робот

Как транслировать сообщение с использованием канала

Я новичок, и я пытаюсь создать простой чат-сервер, где клиенты могут передавать сообщения всем подключенным клиентам.

На моем сервере у меня есть goroutine (бесконечный цикл), который принимает соединение, и все соединения принимаются каналом.

go func() {
    for {
        conn, _ := listener.Accept()
        ch <- conn
        }
}()

Затем я запускаю обработчик (goroutine) для каждого подключенного клиента. Внутри обработчика я пытаюсь транслировать все соединения, итерации по каналу.

for c := range ch {
    conn.Write(msg)
}

Однако я не могу транслировать, потому что (я думаю, что читаю документы), канал должен быть закрыт перед итерацией. Я не уверен, когда я должен закрыть канал, потому что я хочу постоянно принимать новые соединения, и закрытие канала не позволит мне это сделать. Если кто-нибудь может мне помочь или предоставить лучший способ для трансляции сообщений всем подключенным клиентам, это будет оценено по достоинству.

4b9b3361

Ответ 1

То, что вы делаете, - это шаблон выключения, т.е. несколько конечных точек прослушивают один источник входного сигнала. Результатом этого шаблона является то, что только один из этих слушателей сможет получить сообщение, когда есть сообщение в источнике ввода. Единственным исключением является close канала. Этот close будет распознаваться всеми слушателями и, таким образом, "широковещательным".

Но то, что вы хотите сделать, - это трансляция сообщения, считанного из соединения, поэтому мы могли бы сделать что-то вроде этого:

Когда известно количество слушателей

Пусть каждый рабочий прослушивает выделенный широковещательный канал и отправляет сообщение с основного канала на каждый выделенный широковещательный канал.

type worker struct {
    source chan interface{}
    quit chan struct{}
}

func (w *worker) Start() {
    w.source = make(chan interface{}, 10) // some buffer size to avoid blocking
    go func() {
        for {
            select {
            case msg := <-w.source
                // do something with msg
            case <-quit: // will explain this in the last section
                return
            }
        }
    }()
}

И тогда у нас может быть куча рабочих:

workers := []*worker{&worker{}, &worker{}}
for _, worker := range workers { worker.Start() }

Затем запустите наш слушатель:

go func() {
for {
    conn, _ := listener.Accept()
    ch <- conn
    }
}()

И диспетчер:

go func() {
    for {
        msg := <- ch
        for _, worker := workers {
            worker.source <- msg
        }
    }
}()

Когда количество слушателей неизвестно

В этом случае решение, приведенное выше, все еще работает. Единственное различие заключается в том, что всякий раз, когда вам нужен новый рабочий, вам нужно создать нового рабочего, запустить его, а затем вставить в кусок workers. Но для этого метода требуется поточно-безопасный срез, для которого требуется блокировка. Одна из реализаций может выглядеть следующим образом:

type threadSafeSlice struct {
    sync.Mutex
    workers []*worker
}

func (slice *threadSafeSlice) Push(w *worker) {
    slice.Lock()
    defer slice.Unlock()

    workers = append(workers, w)
}

func (slice *threadSafeSlice) Iter(routine func(*worker)) {
    slice.Lock()
    defer slice.Unlock()

    for _, worker := range workers {
        routine(worker)
    }
}

Всякий раз, когда вы хотите запустить рабочего:

w := &worker{}
w.Start()
threadSafeSlice.Push(w)

И ваш диспетчер будет изменен на:

go func() {
    for {
        msg := <- ch
        threadSafeSlice.Iter(func(w *worker) { w.source <- msg })
    }
}()

Последние слова: никогда не оставляйте свисающий горутин

Одна из хороших практик: никогда не оставляйте болтающуюся горутину. Поэтому, когда вы закончили слушать, вам нужно закрыть все горуты, которые вы уволили. Это будет сделано через quit канал в worker:

Сначала нам нужно создать глобальный канал сигнализации quit:

globalQuit := make(chan struct{})

И всякий раз, когда мы создаем рабочего, мы назначаем ему канал globalQuit в качестве его выходного сигнала:

worker.quit = globalQuit

Затем, когда мы хотим закрыть всех рабочих, мы просто делаем:

close(globalQuit)

Так как close будет распознаваться всеми слушающими goroutines (это то, что вы поняли), все goroutines будут возвращены. Не забудьте также закрыть свою диспетчерскую рутину, но я оставлю ее вам:)

Ответ 2

Более элегантным решением является "брокер", где клиенты могут подписываться и отменять подписку на сообщения.

Для элегантной обработки подписки и отмены подписки мы можем использовать для этого каналы, поэтому основной цикл посредника, который получает и распространяет сообщения, может включать все это с помощью одного оператора select, а синхронизация дается из характера решения.

Другой трюк заключается в том, чтобы хранить подписчиков на карте, сопоставляя их с каналом, который мы используем для рассылки им сообщений. Поэтому используйте канал в качестве ключа на карте, а затем добавление и удаление клиентов "просто". Это стало возможным благодаря тому, что значения каналов сравнимы, и их сравнение очень эффективно, поскольку значения каналов являются простыми указателями на дескрипторы каналов.

Без лишних слов, здесь простая реализация брокера:

type Broker struct {
    stopCh    chan struct{}
    publishCh chan interface{}
    subCh     chan chan interface{}
    unsubCh   chan chan interface{}
}

func NewBroker() *Broker {
    return &Broker{
        stopCh:    make(chan struct{}),
        publishCh: make(chan interface{}, 1),
        subCh:     make(chan chan interface{}, 1),
        unsubCh:   make(chan chan interface{}, 1),
    }
}

func (b *Broker) Start() {
    subs := map[chan interface{}]struct{}{}
    for {
        select {
        case <-b.stopCh:
            return
        case msgCh := <-b.subCh:
            subs[msgCh] = struct{}{}
        case msgCh := <-b.unsubCh:
            delete(subs, msgCh)
        case msg := <-b.publishCh:
            for msgCh := range subs {
                // msgCh is buffered, use non-blocking send to protect the broker:
                select {
                case msgCh <- msg:
                default:
                }
            }
        }
    }
}

func (b *Broker) Stop() {
    close(b.stopCh)
}

func (b *Broker) Subscribe() chan interface{} {
    msgCh := make(chan interface{}, 5)
    b.subCh <- msgCh
    return msgCh
}

func (b *Broker) Unsubscribe(msgCh chan interface{}) {
    b.unsubCh <- msgCh
}

func (b *Broker) Publish(msg interface{}) {
    b.publishCh <- msg
}

Пример использования этого:

func main() {
    // Create and start a broker:
    b := NewBroker()
    go b.Start()

    // Create and subscribe 3 clients:
    clientFunc := func(id int) {
        msgCh := b.Subscribe()
        for {
            fmt.Printf("Client %d got message: %v\n", id, <-msgCh)
        }
    }
    for i := 0; i < 3; i++ {
        go clientFunc(i)
    }

    // Start publishing messages:
    go func() {
        for msgId := 0; ; msgId++ {
            b.Publish(fmt.Sprintf("msg#%d", msgId))
            time.Sleep(300 * time.Millisecond)
        }
    }()

    time.Sleep(time.Second)
}

Выход из вышеперечисленного будет (попробуйте на Go Playground):

Client 2 got message: msg#0
Client 0 got message: msg#0
Client 1 got message: msg#0
Client 2 got message: msg#1
Client 0 got message: msg#1
Client 1 got message: msg#1
Client 1 got message: msg#2
Client 2 got message: msg#2
Client 0 got message: msg#2
Client 2 got message: msg#3
Client 0 got message: msg#3
Client 1 got message: msg#3

улучшения

Вы можете рассмотреть следующие улучшения. Они могут или не могут быть полезны в зависимости от того, как/для чего вы используете брокера.

Broker.Unsubscribe() может закрыть канал сообщений, сигнализируя, что на него больше не будет отправлено сообщений:

func (b *Broker) Unsubscribe(msgCh chan interface{}) {
    b.unsubCh <- msgCh
    close(msgCh)
}

Это позволит клиентам range по каналу сообщений, например так:

msgCh := b.Subscribe()
for msg := range msgCh {
    fmt.Printf("Client %d got message: %v\n", id, msg)
}

Затем, если кто-то msgCh этого msgCh вот так:

b.Unsubscribe(msgCh)

Вышеупомянутый цикл диапазона прекратится после обработки всех сообщений, которые были отправлены до вызова Unsubscribe().

Если вы хотите, чтобы ваши клиенты полагались на то, что канал сообщений закрыт, а время жизни брокера меньше, чем время жизни вашего приложения, то вы также можете закрыть все подписанные клиенты, когда брокер остановлен, в методе Start() например:

case <-b.stopCh:
    for msgCh := range subs {
        close(msgCh)
    }
    return

Ответ 3

Передача на фрагмент канала и использование sync.Mutex для управления добавлением и удалением канала может быть самым простым способом в вашем случае.

Вот что вы можете сделать для broadcast в golang:

  • Вы можете транслировать изменение статуса общего ресурса с помощью sync.Cond. Этот способ не имеет никакого выделения после установки, но вы не можете добавить функционал тайм-аута или работать с другим каналом.
  • Вы можете транслировать изменение статуса общего ресурса с закрытым старым каналом и создавать новый канал и sync.Mutex. Таким образом, для каждого изменения статуса есть один адрес, но вы можете добавить функциональность тайм-аута и работать с другим каналом.
  • Вы можете транслировать до фрагмента функции обратного вызова и использовать sync.Mutex для управления ими. Вызывающий может делать каналы. Таким образом, у каждого вызывающего есть несколько адресов и работа с другим каналом.
  • Вы можете транслировать на фрагмент канала и использовать sync.Mutex для управления ими. Таким образом, у каждого вызывающего есть несколько адресов и работа с другим каналом.
  • Вы можете транслировать на фрагмент sync.WaitGroup и использовать sync.Mutex для управления ими.

Ответ 4

Поскольку каналы Go следуют шаблону Communicating Sequential Processes (CSP), каналы представляют собой объект связи "точка-точка". В каждом обмене всегда есть один писатель и один читатель.

Тем не менее, каждый конец канала может использоваться совместно с несколькими goroutines. Это безопасно - опасное состояние гонки не существует.

Таким образом, может быть несколько авторов, разделяющих конец записи. И/или может быть несколько читателей, разделяющих конец чтения. Я написал больше об этом в другом ответе, который включает примеры.

Если вам действительно нужна трансляция, вы не можете сделать это напрямую, но нетрудно реализовать промежуточную версию goroutine, которая копирует значение для каждой из группы выходных каналов.

Ответ 5

Я создал простую библиотеку для поддержки вещания канала. Это может использоваться для широковещательной передачи асинхронных результатов от многих вызовов API современных сервисов. https://github.com/guiguan/caster#broadcast-a-go-channel