Skip to content

Instantly share code, notes, and snippets.

@sergey-shambir
Created December 3, 2018 08:01
Show Gist options
  • Save sergey-shambir/4d8e6eee52f82dc673800eac41daa412 to your computer and use it in GitHub Desktop.
Save sergey-shambir/4d8e6eee52f82dc673800eac41daa412 to your computer and use it in GitHub Desktop.
Another EventBus implementation
// Package eventbus provides event publisher/subscriber support.
// It's inspired by https://github.com/asaskevich/EventBus
// There are differences in API and implementation
// - deadlock prevention: this eventbus doesn't lock mutex when callbacks are called
// - API based on EventID and Event interface instead of strings and variadic arguments list
package eventbus
import (
"sync"
)
// EventID identifies events topic.
type EventID string
// Event must be implemented by anything that can be published
type Event interface {
EventID() EventID
}
// EventHandler is function that can be subscribed to the event
type EventHandler func(event Event)
// Subscription represents active event subscription
type Subscription struct {
eventID EventID
id uint64
}
// BusSubscriber allows to subscribe/unsubscribe own event handlers
type BusSubscriber interface {
Subscribe(eventID EventID, cb EventHandler) Subscription
Unsubscribe(id Subscription)
}
// BusPublisher allows to publish own events
type BusPublisher interface {
Publish(event Event)
}
// Bus allows to subscribe/unsubscribe to external events and publish own events
type Bus interface {
BusSubscriber
BusPublisher
}
// New returns new event bus
func New() Bus {
b := &bus{
infos: make(map[EventID]subscriptionInfoList),
}
return b
}
type subscriptionInfo struct {
id uint64
cb EventHandler
}
type subscriptionInfoList []*subscriptionInfo
type bus struct {
lock sync.Mutex
nextID uint64
infos map[EventID]subscriptionInfoList
}
func (bus *bus) Subscribe(eventID EventID, cb EventHandler) Subscription {
bus.lock.Lock()
defer bus.lock.Unlock()
id := bus.nextID
bus.nextID++
sub := &subscriptionInfo{
id: id,
cb: cb,
}
bus.infos[eventID] = append(bus.infos[eventID], sub)
return Subscription{
eventID: eventID,
id: id,
}
}
func (bus *bus) Unsubscribe(subscription Subscription) {
bus.lock.Lock()
defer bus.lock.Unlock()
if infos, ok := bus.infos[subscription.eventID]; ok {
for idx, info := range infos {
if info.id == subscription.id {
infos = append(infos[:idx], infos[idx+1:]...)
break
}
}
if len(infos) == 0 {
delete(bus.infos, subscription.eventID)
} else {
bus.infos[subscription.eventID] = infos
}
}
}
func (bus *bus) Publish(event Event) {
infos := bus.copySubscriptions(event.EventID())
for _, sub := range infos {
sub.cb(event)
}
}
func (bus *bus) copySubscriptions(eventID EventID) subscriptionInfoList {
// External code may subscribe/unsubscribe during iteration over callbacks,
// so we need to copy subscribers to invoke callbacks.
bus.lock.Lock()
defer bus.lock.Unlock()
if infos, ok := bus.infos[eventID]; ok {
return infos
}
return subscriptionInfoList{}
}
package eventbus
import (
"testing"
"time"
"github.com/stretchr/testify/assert"
)
const (
eventSolarEclipse = EventID("solar_eclipse")
eventMoonEclipse = EventID("moon_eclipse")
)
type solarEclipseEvent struct {
duration time.Duration
}
func (e *solarEclipseEvent) EventID() EventID {
return eventSolarEclipse
}
type moonEclipseEvent struct {
duration time.Duration
}
func (e *moonEclipseEvent) EventID() EventID {
return eventMoonEclipse
}
func TestBus_SubscribePublish(t *testing.T) {
bus := New()
hadEvent := false
duration := 100 * time.Second
bus.Subscribe(eventSolarEclipse, func(e Event) {
assert.Equal(t, e.EventID(), eventSolarEclipse)
se := e.(*solarEclipseEvent)
assert.Equal(t, se.duration, duration)
hadEvent = true
})
bus.Subscribe(eventMoonEclipse, func(e Event) {
t.Fatalf("should never be called")
})
assert.Equal(t, hadEvent, false)
bus.Publish(&solarEclipseEvent{
duration: duration,
})
assert.Equal(t, hadEvent, true)
}
func TestBus_PublishIncompatibleEvent(t *testing.T) {
bus := New()
duration := 100 * time.Second
bus.Subscribe(eventMoonEclipse, func(e Event) {
t.Fatalf("should never be called")
})
bus.Publish(&solarEclipseEvent{
duration: duration,
})
}
func TestBus_SubscribeUnsubscribe(t *testing.T) {
bus := New()
hadEvent := false
duration := 42 * time.Millisecond
id := bus.Subscribe(eventMoonEclipse, func(e Event) {
assert.Equal(t, e.EventID(), eventMoonEclipse)
se := e.(*moonEclipseEvent)
assert.Equal(t, se.duration, duration)
hadEvent = true
})
bus.Publish(&moonEclipseEvent{
duration: duration,
})
assert.Equal(t, hadEvent, true)
hadEvent = false
bus.Unsubscribe(id)
bus.Publish(&moonEclipseEvent{
duration: duration,
})
assert.Equal(t, hadEvent, false)
}
func TestBus_SubscribeMultiple(t *testing.T) {
moonEventCount := 0
moonEclipseDuration := 16 * time.Second
onMoonEclipse := func(e Event) {
assert.Equal(t, e.EventID(), eventMoonEclipse)
moonEventCount++
se := e.(*moonEclipseEvent)
assert.Equal(t, se.duration, moonEclipseDuration)
}
solarEventCount := 0
solarEclipseDuration := 77 * time.Millisecond
onSolarEclipse := func(e Event) {
assert.Equal(t, e.EventID(), eventSolarEclipse)
solarEventCount++
se := e.(*solarEclipseEvent)
assert.Equal(t, se.duration, solarEclipseDuration)
}
bus := New()
publishMoon := func() {
bus.Publish(&moonEclipseEvent{
duration: moonEclipseDuration,
})
}
publishSolar := func() {
bus.Publish(&solarEclipseEvent{
duration: solarEclipseDuration,
})
}
id1 := bus.Subscribe(eventMoonEclipse, onMoonEclipse)
id2 := bus.Subscribe(eventSolarEclipse, onSolarEclipse)
id3 := bus.Subscribe(eventMoonEclipse, onMoonEclipse)
publishMoon()
assert.Equal(t, moonEventCount, 2)
assert.Equal(t, solarEventCount, 0)
publishSolar()
assert.Equal(t, moonEventCount, 2)
assert.Equal(t, solarEventCount, 1)
bus.Unsubscribe(id1)
bus.Unsubscribe(id2)
publishMoon()
publishSolar()
assert.Equal(t, moonEventCount, 3)
assert.Equal(t, solarEventCount, 1)
bus.Unsubscribe(id3)
publishMoon()
publishSolar()
assert.Equal(t, moonEventCount, 3)
assert.Equal(t, solarEventCount, 1)
}
func TestBus_PublishRecursive(t *testing.T) {
moonEventCount := 0
bus := New()
publishMoon := func(duration time.Duration) {
bus.Publish(&moonEclipseEvent{
duration: duration,
})
}
onMoonEclipse := func(e Event) {
assert.Equal(t, e.EventID(), eventMoonEclipse)
moonEventCount++
se := e.(*moonEclipseEvent)
if se.duration < 16*time.Second {
publishMoon(2 * se.duration)
}
}
bus.Subscribe(eventMoonEclipse, onMoonEclipse)
publishMoon(1 * time.Second)
assert.Equal(t, moonEventCount, 5)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment