Skip to content

Instantly share code, notes, and snippets.

@iamtakingiteasy
Created May 3, 2017 14:14
Show Gist options
  • Save iamtakingiteasy/1f81ecb162fd65a5c5a5f6cd42442790 to your computer and use it in GitHub Desktop.
Save iamtakingiteasy/1f81ecb162fd65a5c5a5f6cd42442790 to your computer and use it in GitHub Desktop.
Simple go-ish message bus
package bus
import (
"fmt"
"testing"
)
type intMessage struct {
value int
}
func (msg *intMessage) Inspect() interface{} {
return msg.value
}
func (msg *intMessage) Destroy() {
// not much to do
}
func TestBus(t *testing.T) {
rootBus := MakeRootBus()
t.Run("BusTearDown", func(t *testing.T) {
bus := rootBus.Fork("mybus")
bus.Destroy()
})
t.Run("BusSubBus", func(t *testing.T) {
bus := rootBus.Fork("mybus")
if fmt.Sprint(rootBus) != "(root)" {
t.Fail()
}
if fmt.Sprint(bus) != "(root)/mybus" {
t.Fail()
}
if rootBus.Lookup("mybus") != bus {
t.Fail()
}
bus.Destroy()
})
t.Run("BusSendReceive", func(t *testing.T) {
bus := rootBus.Fork("mybus")
subscription := bus.Subscribe("abc")
if subscription.Topic() != "abc" {
t.Fail()
}
reference := &intMessage{123}
done := make(chan bool, 1)
go func() {
msg := subscription.Receive()
if msg != reference || msg.Inspect() != reference.Inspect() {
t.Fail()
}
subscription.Unsubscribe()
done <- true
close(done)
}()
bus.Publish("abc", reference)
<-done
func() {
defer func() {
recover()
}()
subscription.Receive()
t.Fail()
}()
bus.Destroy()
})
rootBus.Destroy()
}
package bus
// Marks destroyable instance
type Destroyable interface {
// Destroys object and returns all acquired resources
Destroy()
}
// Marks message message
type Message interface {
// Returns immutable (copy) data inside
Inspect() interface{}
}
// Marks publishable message message
type MessageValue interface {
Message
Destroyable
}
// Actual subscription
type Subscription interface {
// Unsubscribes from bus
Unsubscribe()
// Receives a message (just like <- chan)
Receive() Message
// Returns topic on which it was subscribed
Topic() interface{}
}
// Message bus
type Bus interface {
// Publishes destroyable message onto this bus with given topic
Publish(topic interface{}, value MessageValue)
// Subsribes to given topic
Subscribe(topic interface{}) Subscription
// Forks given bus, making child sub-bus (messages are not inhereited)
Fork(name string) BusValue
// Look up a child sub-bus(es)
Lookup(name ...string) Bus
}
// Destroyable bus for the bus owner
type BusValue interface {
Bus
Destroyable
}
// Returns new root-level bus
func MakeRootBus() BusValue {
return makeBus(nil, "(root)")
}
package bus
import (
"strings"
)
// topic+message pair
type pair struct {
topic interface{}
message MessageValue
}
// simple subscription
type simpleSubscription struct {
bus *simpleBus
topic interface{}
queue chan Message
valid bool
}
func (sub *simpleSubscription) Unsubscribe() {
sub.valid = false
sub.bus.queue <- &pair{nil, sub}
}
func (sub *simpleSubscription) Inspect() interface{} {
return *sub
}
func (sub *simpleSubscription) Destroy() {
close(sub.queue)
sub.topic = nil
sub.bus = nil
}
func (sub *simpleSubscription) Receive() Message {
if !sub.valid {
panic("Receiving on unsubscribed subscription")
}
return <- sub.queue
}
func (sub *simpleSubscription) Topic() interface{} {
return sub.topic
}
type simpleBus struct {
parent *simpleBus
name string
consumers map[interface{}]map[*simpleSubscription]bool
forks map[string]BusValue
queue chan *pair
shutdown bool
shutdownComplete chan bool
}
func (bus *simpleBus) Publish(topic interface{}, value MessageValue) {
if bus.shutdown {
panic("Publishing on shutdowned bus")
}
if topic == nil {
panic("Publishing on nil topic")
}
if value == nil {
panic("Publishing nil message value")
}
bus.queue <- &pair{topic, value}
}
func (bus *simpleBus) Subscribe(topic interface{}) Subscription {
if topic == nil {
panic("Subscribing on nil topic")
}
s := &simpleSubscription{bus, topic, make(chan Message), true}
tmap, ok := bus.consumers[topic]
if !ok {
tmap = make(map[*simpleSubscription]bool)
bus.consumers[topic] = tmap
}
tmap[s] = true
return s
}
func (bus *simpleBus) Fork(name string) BusValue {
newbus := makeBus(bus, name)
bus.forks[name] = newbus
return newbus
}
func (bus *simpleBus) Lookup(name ...string) Bus {
if len(name) == 0 {
return bus
}
if fork, ok := bus.forks[name[0]]; !ok {
return nil
} else {
return fork.Lookup(name[1:]...)
}
}
func (bus *simpleBus) Destroy() {
if bus.shutdown {
return
}
bus.shutdown = true
close(bus.queue)
for _, fork := range bus.forks {
fork.Destroy()
}
<- bus.shutdownComplete
}
func (bus *simpleBus) String() string {
if bus.parent != nil {
return bus.parent.String() + "/" + bus.name
}
return bus.name
}
func process(bus *simpleBus) {
for !bus.shutdown {
select {
case p, more := <- bus.queue:
if p != nil {
if p.topic == nil {
switch sub := p.message.(type) {
case *simpleSubscription:
delete(bus.consumers[sub.topic], sub)
}
} else {
if subs, ok := bus.consumers[p.topic]; ok {
for sub := range subs {
sub.queue <- p.message
}
}
}
p.message.Destroy()
}
if !more {
break
}
}
}
bus.shutdownComplete <- true
}
func makeBus(parent *simpleBus, name string) *simpleBus {
name = strings.TrimSpace(name)
if parent != nil && name == "" {
panic("Empty bus name")
}
newbus := &simpleBus{
parent: parent,
name: name,
consumers: make(map[interface{}]map[*simpleSubscription]bool),
forks: make(map[string]BusValue),
queue: make(chan *pair, 1),
shutdown: false,
shutdownComplete: make(chan bool),
}
go process(newbus)
return newbus
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment