Created
May 3, 2017 14:14
-
-
Save iamtakingiteasy/1f81ecb162fd65a5c5a5f6cd42442790 to your computer and use it in GitHub Desktop.
Simple go-ish message bus
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package bus | |
import ( | |
"fmt" | |
"testing" | |
) | |
type intMessage struct { | |
value int | |
} | |
func (msg *intMessage) Inspect() interface{} { | |
return msg.value | |
} | |
func (msg *intMessage) Destroy() { | |
// not much to do | |
} | |
func TestBus(t *testing.T) { | |
rootBus := MakeRootBus() | |
t.Run("BusTearDown", func(t *testing.T) { | |
bus := rootBus.Fork("mybus") | |
bus.Destroy() | |
}) | |
t.Run("BusSubBus", func(t *testing.T) { | |
bus := rootBus.Fork("mybus") | |
if fmt.Sprint(rootBus) != "(root)" { | |
t.Fail() | |
} | |
if fmt.Sprint(bus) != "(root)/mybus" { | |
t.Fail() | |
} | |
if rootBus.Lookup("mybus") != bus { | |
t.Fail() | |
} | |
bus.Destroy() | |
}) | |
t.Run("BusSendReceive", func(t *testing.T) { | |
bus := rootBus.Fork("mybus") | |
subscription := bus.Subscribe("abc") | |
if subscription.Topic() != "abc" { | |
t.Fail() | |
} | |
reference := &intMessage{123} | |
done := make(chan bool, 1) | |
go func() { | |
msg := subscription.Receive() | |
if msg != reference || msg.Inspect() != reference.Inspect() { | |
t.Fail() | |
} | |
subscription.Unsubscribe() | |
done <- true | |
close(done) | |
}() | |
bus.Publish("abc", reference) | |
<-done | |
func() { | |
defer func() { | |
recover() | |
}() | |
subscription.Receive() | |
t.Fail() | |
}() | |
bus.Destroy() | |
}) | |
rootBus.Destroy() | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package bus | |
// Marks destroyable instance | |
type Destroyable interface { | |
// Destroys object and returns all acquired resources | |
Destroy() | |
} | |
// Marks message message | |
type Message interface { | |
// Returns immutable (copy) data inside | |
Inspect() interface{} | |
} | |
// Marks publishable message message | |
type MessageValue interface { | |
Message | |
Destroyable | |
} | |
// Actual subscription | |
type Subscription interface { | |
// Unsubscribes from bus | |
Unsubscribe() | |
// Receives a message (just like <- chan) | |
Receive() Message | |
// Returns topic on which it was subscribed | |
Topic() interface{} | |
} | |
// Message bus | |
type Bus interface { | |
// Publishes destroyable message onto this bus with given topic | |
Publish(topic interface{}, value MessageValue) | |
// Subsribes to given topic | |
Subscribe(topic interface{}) Subscription | |
// Forks given bus, making child sub-bus (messages are not inhereited) | |
Fork(name string) BusValue | |
// Look up a child sub-bus(es) | |
Lookup(name ...string) Bus | |
} | |
// Destroyable bus for the bus owner | |
type BusValue interface { | |
Bus | |
Destroyable | |
} | |
// Returns new root-level bus | |
func MakeRootBus() BusValue { | |
return makeBus(nil, "(root)") | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package bus | |
import ( | |
"strings" | |
) | |
// topic+message pair | |
type pair struct { | |
topic interface{} | |
message MessageValue | |
} | |
// simple subscription | |
type simpleSubscription struct { | |
bus *simpleBus | |
topic interface{} | |
queue chan Message | |
valid bool | |
} | |
func (sub *simpleSubscription) Unsubscribe() { | |
sub.valid = false | |
sub.bus.queue <- &pair{nil, sub} | |
} | |
func (sub *simpleSubscription) Inspect() interface{} { | |
return *sub | |
} | |
func (sub *simpleSubscription) Destroy() { | |
close(sub.queue) | |
sub.topic = nil | |
sub.bus = nil | |
} | |
func (sub *simpleSubscription) Receive() Message { | |
if !sub.valid { | |
panic("Receiving on unsubscribed subscription") | |
} | |
return <- sub.queue | |
} | |
func (sub *simpleSubscription) Topic() interface{} { | |
return sub.topic | |
} | |
type simpleBus struct { | |
parent *simpleBus | |
name string | |
consumers map[interface{}]map[*simpleSubscription]bool | |
forks map[string]BusValue | |
queue chan *pair | |
shutdown bool | |
shutdownComplete chan bool | |
} | |
func (bus *simpleBus) Publish(topic interface{}, value MessageValue) { | |
if bus.shutdown { | |
panic("Publishing on shutdowned bus") | |
} | |
if topic == nil { | |
panic("Publishing on nil topic") | |
} | |
if value == nil { | |
panic("Publishing nil message value") | |
} | |
bus.queue <- &pair{topic, value} | |
} | |
func (bus *simpleBus) Subscribe(topic interface{}) Subscription { | |
if topic == nil { | |
panic("Subscribing on nil topic") | |
} | |
s := &simpleSubscription{bus, topic, make(chan Message), true} | |
tmap, ok := bus.consumers[topic] | |
if !ok { | |
tmap = make(map[*simpleSubscription]bool) | |
bus.consumers[topic] = tmap | |
} | |
tmap[s] = true | |
return s | |
} | |
func (bus *simpleBus) Fork(name string) BusValue { | |
newbus := makeBus(bus, name) | |
bus.forks[name] = newbus | |
return newbus | |
} | |
func (bus *simpleBus) Lookup(name ...string) Bus { | |
if len(name) == 0 { | |
return bus | |
} | |
if fork, ok := bus.forks[name[0]]; !ok { | |
return nil | |
} else { | |
return fork.Lookup(name[1:]...) | |
} | |
} | |
func (bus *simpleBus) Destroy() { | |
if bus.shutdown { | |
return | |
} | |
bus.shutdown = true | |
close(bus.queue) | |
for _, fork := range bus.forks { | |
fork.Destroy() | |
} | |
<- bus.shutdownComplete | |
} | |
func (bus *simpleBus) String() string { | |
if bus.parent != nil { | |
return bus.parent.String() + "/" + bus.name | |
} | |
return bus.name | |
} | |
func process(bus *simpleBus) { | |
for !bus.shutdown { | |
select { | |
case p, more := <- bus.queue: | |
if p != nil { | |
if p.topic == nil { | |
switch sub := p.message.(type) { | |
case *simpleSubscription: | |
delete(bus.consumers[sub.topic], sub) | |
} | |
} else { | |
if subs, ok := bus.consumers[p.topic]; ok { | |
for sub := range subs { | |
sub.queue <- p.message | |
} | |
} | |
} | |
p.message.Destroy() | |
} | |
if !more { | |
break | |
} | |
} | |
} | |
bus.shutdownComplete <- true | |
} | |
func makeBus(parent *simpleBus, name string) *simpleBus { | |
name = strings.TrimSpace(name) | |
if parent != nil && name == "" { | |
panic("Empty bus name") | |
} | |
newbus := &simpleBus{ | |
parent: parent, | |
name: name, | |
consumers: make(map[interface{}]map[*simpleSubscription]bool), | |
forks: make(map[string]BusValue), | |
queue: make(chan *pair, 1), | |
shutdown: false, | |
shutdownComplete: make(chan bool), | |
} | |
go process(newbus) | |
return newbus | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment