Skip to content

Instantly share code, notes, and snippets.

@gburgett
Created April 27, 2015 21:40
Show Gist options
  • Save gburgett/2e01236c11f3cd2e0c7f to your computer and use it in GitHub Desktop.
Save gburgett/2e01236c11f3cd2e0c7f to your computer and use it in GitHub Desktop.
A broadcast channel implementation in Go.
/*
Package broadcast implements a broadcasting channel. The broadcaster
creates or wraps a channel which when written to is replicated to all attached
receivers.
example:
ticker := time.NewTicker(1 * time.Millisecond)
b, ch := broadcast.NewBroadcaster()
go func() {
for v := range ticker.C {
ch <- v
}
}()
for i := 0; i < 10; i++ {
go func(i int, recv <-chan interface{}) {
for v = range receiver {
... do something every millisecond on 10 different goroutines
}
}(i, b.Receiver())
}
*/
package broadcast
type Broadcaster struct {
publisher chan<- interface{}
toadd chan *receiver
receive *receiver
}
type receiver struct {
v chan interface{}
b *Broadcaster
next *receiver
closed bool
}
// Creates a new broadcaster, with associated write channel. Values written to the write channel
// will be propagated to every receiver channel.
func NewBroadcaster() (*Broadcaster, chan<- interface{}) {
ch := make(chan interface{})
ret := &Broadcaster{
publisher: ch,
toadd: make(chan *receiver),
}
runBroadcaster(ret, ch)
return ret, ch
}
// Creates a new broadcaster around the given channel. All events read from the channel
// will be propagated to every receiver channel. The given channel must be closed independently
// of the broadcaster.
func Broadcast(ch <-chan interface{}) *Broadcaster {
ret := &Broadcaster{
publisher: nil,
toadd: make(chan *receiver),
}
runBroadcaster(ret, ch)
return ret
}
func runBroadcaster(b *Broadcaster, ch <-chan interface{}) {
go func() {
defer b.Close()
for v := range ch {
propagate(b, v)
}
b.publisher = nil //already closed
}()
go func() {
for r := range b.toadd {
r.next = b.receive
b.receive = r
}
for n := b.receive; n != nil; n = n.next {
close(n.v)
}
}()
}
func propagate(b *Broadcaster, v interface{}) {
for n := b.receive; n != nil; n = n.next {
n.v <- v
}
}
// Closes the writer channel, which propagates the close event to all receiver channels.
func (b *Broadcaster) Close() {
if b.toadd != nil {
close(b.toadd)
b.toadd = nil
}
if b.publisher != nil {
close(b.publisher)
b.publisher = nil
}
}
// Gets a receiver channel from the broadcaster. Every receiver channel receives all messages sent to the publisher.
func (b *Broadcaster) Receive() <-chan interface{} {
ch := make(chan interface{})
ret := &receiver{
v: ch,
b: b,
}
b.toadd <- ret
return ret.v
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment