Skip to content

Instantly share code, notes, and snippets.

@VMois
Last active August 29, 2021 19:30
Show Gist options
  • Save VMois/b50f71114b4086c724b1aec4b7b916a3 to your computer and use it in GitHub Desktop.
Save VMois/b50f71114b4086c724b1aec4b7b916a3 to your computer and use it in GitHub Desktop.
Pubsub in Golang using channels. Subscribe, Unsubscribe, Publish and Close methods are present
// Pubsub using channels. Subscribe, Unsubscribe, Publish and Close methods are present.
// Code is taken from this article https://eli.thegreenplace.net/2020/pubsub-using-channels-in-go/ and Unsubscribe method added by VMois
import "sync"
type Pubsub struct {
mu sync.RWMutex
subs map[string][]chan string
closed bool
}
func NewPubsub() *Pubsub {
ps := &Pubsub{}
ps.subs = make(map[string][]chan string)
return ps
}
func (ps *Pubsub) Subscribe(topic string) <-chan string {
ps.mu.Lock()
defer ps.mu.Unlock()
ch := make(chan string, 1)
ps.subs[topic] = append(ps.subs[topic], ch)
return ch
}
func (ps *Pubsub) Unsubscribe(topic string, c <-chan string) {
ps.mu.Lock()
defer ps.mu.Unlock()
indexToRemove := -1
for index, ch := range ps.subs[topic] {
if c == ch {
indexToRemove = index
close(ch)
break
}
}
ps.subs[topic] = append(ps.subs[topic][:indexToRemove], ps.subs[topic][indexToRemove+1:]...)
}
func (ps *Pubsub) Publish(topic string, msg OpResult) {
ps.mu.RLock()
defer ps.mu.RUnlock()
if ps.closed {
return
}
for _, ch := range ps.subs[topic] {
ch <- msg
}
}
func (ps *Pubsub) Close() {
ps.mu.Lock()
defer ps.mu.Unlock()
if !ps.closed {
ps.closed = true
for _, subs := range ps.subs {
for _, ch := range subs {
close(ch)
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment