Skip to content

Instantly share code, notes, and snippets.

@kwiesmueller
Last active July 11, 2018 01:31
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save kwiesmueller/59121f23c7968b5104a153d7beb61f03 to your computer and use it in GitHub Desktop.
Save kwiesmueller/59121f23c7968b5104a153d7beb61f03 to your computer and use it in GitHub Desktop.
Channel based fanOut implementation in Golang with dynamic, threadsafe subscriptions and unsubscriptions sending and reveiving from a potential number of n publishers and subscribers. This implementation relies solely onto channels and does not use mutexes.
package main
import (
"context"
"fmt"
"math/rand"
"os"
"sync/atomic"
"time"
"go.uber.org/zap"
"github.com/seibert-media/golibs/log"
)
var logger *log.Logger
var publishCount int64
var resultCount int64
var realReceive int64
const (
// amount of subscribers
subscriberCount int = 20
// amount of publishers
publisherCount int = 1
// amount of events sent by each publisher
eventCount int = 10000
// enable cancellation
enableCancel bool = true
// debug write to file
writeToFiles bool = false
)
var (
// time until a publishers sends the next event
publishSleep time.Duration = (time.Duration(rand.Intn(5)) * time.Nanosecond)
// time until the dispatcher gives up sending to a subscription
subscriptionTimeout time.Duration = time.Second * 1
// time until the dispatcher gives up finding a subscription
dispatchTimeout time.Duration = time.Millisecond * 1
// time until the dispatcher gives up looking for new events
noEventTimeout time.Duration = time.Millisecond * 1
// time until subscriptions get closed
earlyCloseDuration = func() time.Duration { return time.Duration(rand.Intn(5)) * time.Second }
// time until new subscriptions get added
lateAddDuration = func() time.Duration { return time.Duration(rand.Intn(5)) * time.Second }
)
func elapsed() func() {
start := time.Now()
return func() {
logger.Info("finished",
zap.Time("started", start),
zap.Duration("took", time.Since(start)))
}
}
func main() {
logger = log.New("", true)
resultCount = 0
p := &Publisher{
events: make(chan *Event),
done: make(chan error),
listeners: make(chan *subscription),
}
logger.Info("starting")
defer elapsed()()
go p.Dispatch()
for is := 1; is <= subscriberCount; is++ {
if is%2 == 0 && enableCancel {
go func(is int) {
<-time.After(lateAddDuration())
p.buildSubscriber(fmt.Sprintf("%d", is))
}(is)
} else {
cancel, s := p.buildSubscriber(fmt.Sprintf("%d", is))
//p.buildSubscriber(fmt.Sprintf("%d", is))
go func(s *subscription, cancel context.CancelFunc) {
<-time.After(earlyCloseDuration())
if enableCancel {
logger.Info("canceling subscription",
zap.String("name", s.name),
zap.Int64("after", atomic.LoadInt64(&publishCount)),
zap.Int64("received", atomic.AddInt64(&realReceive, atomic.LoadInt64(&s.ec))))
cancel()
}
}(s, cancel)
}
}
for ip := 0; ip < publisherCount; ip++ {
go func() {
i := 0
for {
i++
if i > eventCount {
return
}
atomic.AddInt64(&publishCount, 1)
p.Publish(&Event{id: int64(i), msg: fmt.Sprintf("%d", i), source: fmt.Sprintf("%d", ip)})
time.Sleep(publishSleep)
}
logger.Info("finished publishing",
zap.Int64("count", atomic.LoadInt64(&publishCount)))
}()
}
<-p.done
}
func (p *Publisher) buildSubscriber(name string) (context.CancelFunc, *subscription) {
ctx, cancel := context.WithCancel(context.Background())
var f *os.File
if writeToFiles {
f, _ = os.Create(fmt.Sprintf("subscribers/%s", name))
}
s := p.Subscribe(ctx, name, make(chan *Event))
go func(sub *subscription) {
for e := range sub.listener {
ec := atomic.AddInt64(&sub.ec, 1)
prv := atomic.AddInt64(&sub.previous, 1)
logger.Info("received event",
zap.String("event", e.msg),
zap.String("source", e.source),
zap.String("subscription", sub.name),
zap.Int64("ownCount", ec),
zap.Int64("globalCount", atomic.AddInt64(&resultCount, 1)))
if writeToFiles {
f.Write([]byte(fmt.Sprintln(e.msg)))
}
if e.id != prv {
logger.Warn("missing id",
zap.String("event", e.msg),
zap.String("source", e.source),
zap.String("subscription", sub.name),
zap.Int64("ownCount", ec),
zap.Int64("globalCount", atomic.LoadInt64(&resultCount)),
zap.Int64("id", e.id),
zap.Int64("prv", prv))
}
}
if writeToFiles {
f.Close()
}
}(s)
return cancel, s
}
type Event struct {
id int64
source string
msg string
}
type subscription struct {
name string
// closed on unsubscribe
ec int64
previous int64
done chan bool
listener chan *Event
}
type Publisher struct {
events chan *Event
done chan error
listeners chan *subscription
}
func (p *Publisher) Subscribe(ctx context.Context, name string, e chan *Event) *subscription {
s := &subscription{
name: name,
ec: 0,
done: make(chan bool),
listener: e,
}
logger.Info("subscribing", zap.String("name", s.name), zap.Int64("after", atomic.LoadInt64(&resultCount)))
go func() { p.listeners <- s }()
go func() {
<-ctx.Done()
close(s.done)
s.listener = nil
}()
return s
}
func (p *Publisher) Publish(e *Event) {
p.events <- e
}
func (p *Publisher) Dispatch() error {
done := 0
for {
select {
case err := <-p.done:
return err
case e := <-p.events:
var al []*subscription
loop:
for {
select {
case l := <-p.listeners:
select {
case _, ok := <-l.done:
if !ok {
logger.Info("subscription ended",
zap.String("event", e.msg),
zap.String("source", e.source),
zap.String("subscription", l.name))
}
case l.listener <- e:
al = append(al, l)
case <-time.After(subscriptionTimeout):
logger.Info("subscription timeout",
zap.String("event", e.msg),
zap.String("source", e.source),
zap.String("subscription", l.name))
}
case <-time.After(dispatchTimeout):
logger.Info("dispatch timeout",
zap.String("type", "not listening"),
zap.String("event", e.msg),
zap.String("source", e.source))
break loop
}
}
for _, l := range al {
go func(l *subscription) { p.listeners <- l }(l)
}
case <-time.After(noEventTimeout):
done++
logger.Info("dispatch timeout", zap.String("type", "no event"))
logger.Info("total event count",
zap.Int64("published", atomic.LoadInt64(&publishCount)),
zap.Int64("received", atomic.LoadInt64(&resultCount)),
zap.Int64("expected", int64((eventCount*subscriberCount)*publisherCount)),
zap.Int64("diff", int64((eventCount*subscriberCount)*publisherCount)-atomic.LoadInt64(&resultCount)),
zap.Int("events", eventCount),
zap.Int("subscribers", subscriberCount),
zap.Int64("realReceive", atomic.LoadInt64(&realReceive)),
)
if done > 10 {
p.done <- nil
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment