Skip to content

Instantly share code, notes, and snippets.

@johnrichardrinehart
Last active May 14, 2020 18:48
Show Gist options
  • Save johnrichardrinehart/8aae1d73b5bdda33eabff1cd91eb7979 to your computer and use it in GitHub Desktop.
Save johnrichardrinehart/8aae1d73b5bdda33eabff1cd91eb7979 to your computer and use it in GitHub Desktop.
A message broker using two concurrency primitives (channels and mutexes) in Go
package broker
import (
"sync"
"time"
"github.com/rs/zerolog/log"
)
type (
message []byte
subscriber chan message
Broker struct {
publishCh chan message // chan []byte
subCh chan subscriber // chan chan []byte
unsubCh chan subscriber // chan chan []byte
subs map[subscriber]bool // map[chan []byte]bool
sync.Mutex // Mutex instead of RWMutex since we only have at most one concurrent reader
shutdownCh chan bool // uh... a shutdown channel
wg *sync.WaitGroup // wg.Wait() in Stop() to wait for handleSubscriptions and handlePublishing() to terminate
}
)
func New() *Broker {
return &Broker{
publishCh: make(chan message),
subCh: make(chan subscriber),
unsubCh: make(chan subscriber),
subs: make(map[subscriber]bool), // map of subscribers
shutdownCh: make(chan bool),
wg: &sync.WaitGroup{},
}
}
// Start spawns two concurrently-executed goroutines (not nested) which manage each 1) subscription and 2) publishing
// The justification for separating these can be made in a few different ways. The first consideration is that of
// responsibility and is a semantic one. Subscription and message passing are different behaviors; so, when
// considering "separation of concerns" it's clear that these should be separate. The other justification is one based
// on performance. In the situation where a large number of slow consumers are being dispatched messages, running
// subscriptions in the same goroutine may block (un)subscriptions for an unacceptably long amount of time. Of course,
// this delay can be mitigated in a variety of ways. This solution is only one and benchmarking should be used to determine
// an optimal design for one's hardware and one's runtime context (number of processors and "neighboring processes" being
// managed by the operating system)
func (b *Broker) Start() {
b.wg.Add(2)
go b.handleSubscriptions()
go b.handlePublishing()
}
// subscription handling goroutine (map writer)
func (b *Broker) handleSubscriptions() {
defer b.wg.Done()
for {
select {
case <-b.shutdownCh:
return // we're all done
case sub := <-b.subCh:
b.Lock()
b.subs[sub] = true
b.Unlock()
case sub := <-b.unsubCh:
b.Lock()
delete(b.subs, sub)
b.Unlock()
}
}
}
// message publishing goroutine (map reader)
func (b *Broker) handlePublishing() {
defer b.wg.Done()
for {
select {
case <-b.shutdownCh:
return
case msg := <-b.publishCh:
b.Lock()
for sub := range b.subs {
select {
case sub <- msg:
case <-time.After(10 * time.Millisecond):
log.Warn().Msg("failed to publish message to subscriber")
}
}
b.Unlock()
}
}
}
func (b *Broker) Subscribe(sub subscriber) {
b.subCh <- sub
}
func (b *Broker) Unsubscribe(sub subscriber) {
b.unsubCh <- sub
}
func (b *Broker) Stop() {
close(b.shutdownCh)
b.wg.Wait() // when both goroutines terminate
for sub := range b.subs {
close(sub) // make sure client knows that no more information is forthcoming...
delete(b.subs, sub)
}
}
func (b *Broker) Publish(msg message) {
b.publishCh <- msg
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment