Last active
May 14, 2020 18:48
-
-
Save johnrichardrinehart/8aae1d73b5bdda33eabff1cd91eb7979 to your computer and use it in GitHub Desktop.
A message broker using two concurrency primitives (channels and mutexes) in Go
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package broker | |
import ( | |
"sync" | |
"time" | |
"github.com/rs/zerolog/log" | |
) | |
type ( | |
message []byte | |
subscriber chan message | |
Broker struct { | |
publishCh chan message // chan []byte | |
subCh chan subscriber // chan chan []byte | |
unsubCh chan subscriber // chan chan []byte | |
subs map[subscriber]bool // map[chan []byte]bool | |
sync.Mutex // Mutex instead of RWMutex since we only have at most one concurrent reader | |
shutdownCh chan bool // uh... a shutdown channel | |
wg *sync.WaitGroup // wg.Wait() in Stop() to wait for handleSubscriptions and handlePublishing() to terminate | |
} | |
) | |
func New() *Broker { | |
return &Broker{ | |
publishCh: make(chan message), | |
subCh: make(chan subscriber), | |
unsubCh: make(chan subscriber), | |
subs: make(map[subscriber]bool), // map of subscribers | |
shutdownCh: make(chan bool), | |
wg: &sync.WaitGroup{}, | |
} | |
} | |
// Start spawns two concurrently-executed goroutines (not nested) which manage each 1) subscription and 2) publishing | |
// The justification for separating these can be made in a few different ways. The first consideration is that of | |
// responsibility and is a semantic one. Subscription and message passing are different behaviors; so, when | |
// considering "separation of concerns" it's clear that these should be separate. The other justification is one based | |
// on performance. In the situation where a large number of slow consumers are being dispatched messages, running | |
// subscriptions in the same goroutine may block (un)subscriptions for an unacceptably long amount of time. Of course, | |
// this delay can be mitigated in a variety of ways. This solution is only one and benchmarking should be used to determine | |
// an optimal design for one's hardware and one's runtime context (number of processors and "neighboring processes" being | |
// managed by the operating system) | |
func (b *Broker) Start() { | |
b.wg.Add(2) | |
go b.handleSubscriptions() | |
go b.handlePublishing() | |
} | |
// subscription handling goroutine (map writer) | |
func (b *Broker) handleSubscriptions() { | |
defer b.wg.Done() | |
for { | |
select { | |
case <-b.shutdownCh: | |
return // we're all done | |
case sub := <-b.subCh: | |
b.Lock() | |
b.subs[sub] = true | |
b.Unlock() | |
case sub := <-b.unsubCh: | |
b.Lock() | |
delete(b.subs, sub) | |
b.Unlock() | |
} | |
} | |
} | |
// message publishing goroutine (map reader) | |
func (b *Broker) handlePublishing() { | |
defer b.wg.Done() | |
for { | |
select { | |
case <-b.shutdownCh: | |
return | |
case msg := <-b.publishCh: | |
b.Lock() | |
for sub := range b.subs { | |
select { | |
case sub <- msg: | |
case <-time.After(10 * time.Millisecond): | |
log.Warn().Msg("failed to publish message to subscriber") | |
} | |
} | |
b.Unlock() | |
} | |
} | |
} | |
func (b *Broker) Subscribe(sub subscriber) { | |
b.subCh <- sub | |
} | |
func (b *Broker) Unsubscribe(sub subscriber) { | |
b.unsubCh <- sub | |
} | |
func (b *Broker) Stop() { | |
close(b.shutdownCh) | |
b.wg.Wait() // when both goroutines terminate | |
for sub := range b.subs { | |
close(sub) // make sure client knows that no more information is forthcoming... | |
delete(b.subs, sub) | |
} | |
} | |
func (b *Broker) Publish(msg message) { | |
b.publishCh <- msg | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment