Skip to content

Instantly share code, notes, and snippets.

@snaury
Created May 16, 2012 21:28
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save snaury/2714090 to your computer and use it in GitHub Desktop.
Save snaury/2714090 to your computer and use it in GitHub Desktop.
Non-blocking fan-out broker in go
package main
import (
"fmt"
"time"
)
type Message struct {
Timestamp time.Time
Client *Client
Value string
}
type Client struct {
Inbox <-chan []*Message
Outbox chan<- *Message
}
type Broker struct {
Timeout time.Duration
reqadd chan chan<- *Client
reqremove chan *Client
}
func NewBroker() *Broker {
b := &Broker{
Timeout: 60 * time.Second,
reqadd: make(chan chan<- *Client),
reqremove: make(chan *Client),
}
go func() {
var outbox = make(chan *Message, 64)
var clients = make(map[*Client]chan []*Message)
for {
select {
case reply := <-b.reqadd:
inbox := make(chan []*Message, 1)
client := &Client{
Inbox: inbox,
Outbox: outbox,
}
clients[client] = inbox
reply <- client
case client := <-b.reqremove:
if inbox, ok := clients[client]; ok {
delete(clients, client)
close(inbox)
}
case message := <-outbox:
// Slow clients scheduled to be discarded
var slow []*Client
// Inbox older than this is deemed slow
expired := time.Now().Add(-b.Timeout)
// Iterate over all clients and their inboxes
for client, inbox := range(clients) {
if client == message.Client {
// Avoid echoing messages
continue
}
// Pending messages in the inbox
var pending []*Message
// Inbox is either empty, in which case
// new slice will be created, or there's
// exactly one element with a slice of
// pending messages, where a new element
// will be added at the end
select {
case pending = <-inbox:
default:
}
// Check for expiration
if len(pending) > 0 && pending[0].Timestamp.Before(expired) {
// Client is too slow, but we cannot delete
// it right now, so schedule it for later
slow = append(slow, client)
} else {
// Overwise append the message and put it
// into the channel. This is safe and won't
// block, since we are the only ones writing
// to it, and it was cleared above
inbox <- append(pending, message)
}
}
// Discard all slow clients
for _, client := range(slow) {
inbox := clients[client]
delete(clients, client)
close(inbox)
}
}
}
}()
return b
}
func (b *Broker) NewClient() *Client {
reply := make(chan *Client, 1)
b.reqadd <- reply
return <-reply
}
func (b *Broker) RemoveClient(client *Client) {
b.reqremove <- client
}
var broker = NewBroker()
func main() {
ready := make(chan struct{})
go func() {
client := broker.NewClient()
defer broker.RemoveClient(client)
close(ready)
for batch := range(client.Inbox) {
for _, message := range(batch) {
fmt.Printf("%v %s\n", message.Timestamp, message.Value)
}
fmt.Printf("...end of batch\n")
}
fmt.Printf("...disconnected\n")
}()
client := broker.NewClient()
<-ready
client.Outbox <- &Message{time.Now(), client, "Hello world!"}
time.Sleep(time.Nanosecond)
client.Outbox <- &Message{time.Now(), client, "Are you slow?"}
time.Sleep(time.Second)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment