Skip to content

Instantly share code, notes, and snippets.

@athoune
Created December 17, 2018 23:08
Show Gist options
  • Save athoune/9ef2736a89c5e565ff8ffd307807ca86 to your computer and use it in GitHub Desktop.
Save athoune/9ef2736a89c5e565ff8ffd307807ca86 to your computer and use it in GitHub Desktop.
Pubsub
package pubsub
import (
"sync"
)
type Pubsub struct {
channels map[string]map[int]*Channel
cpt int
lock sync.Mutex
}
type Channel struct {
C chan interface{}
idx int
pubsub *Pubsub
channel string
Done chan interface{}
}
func New() *Pubsub {
return &Pubsub{
channels: make(map[string]map[int]*Channel),
cpt: 0,
}
}
func (p *Pubsub) Subscribe(channel string) *Channel {
p.lock.Lock()
defer p.lock.Unlock()
c := &Channel{
C: make(chan interface{}),
Done: make(chan interface{}, 1),
idx: p.cpt,
pubsub: p,
channel: channel,
}
if _, ok := p.channels[channel]; ok {
p.channels[channel][c.idx] = c
} else {
p.channels[channel] = map[int]*Channel{c.idx: c}
}
p.cpt++
return c
}
func (p *Pubsub) Publish(channel string, msg interface{}) int {
channels, ok := p.channels[channel]
if !ok {
return 0
}
for _, c := range channels {
c.C <- msg
}
return len(channels)
}
func (c *Channel) Close() {
delete(c.pubsub.channels[c.channel], c.idx)
c.Done <- true
}
package pubsub
import (
"fmt"
"sync"
"testing"
"github.com/stretchr/testify/assert"
)
func TestPubSub(t *testing.T) {
var msg, end sync.WaitGroup
p := New()
slurp := func(name string) *Channel {
fmt.Println(name)
c := p.Subscribe("hello")
go func() {
for {
select {
case <-c.Done:
end.Done()
return
case m := <-c.C:
fmt.Println(name, m)
msg.Done()
}
}
}()
return c
}
msg.Add(2)
end.Add(2)
c1 := slurp("Alice")
c2 := slurp("Bob")
n := p.Publish("hello", "World")
msg.Wait()
assert.Equal(t, 2, n)
c1.Close()
c2.Close()
end.Wait()
assert.Len(t, p.channels["hello"], 0)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment