Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Example of using Redis PubSub and EventSource with golang
package main
import (
eventsource "github.com/antage/eventsource/http"
redis "github.com/vmihailenco/redis"
"log"
"net/http"
)
func haltOnErr(err error){
if err != nil { panic(err) }
}
type subscriptionHandler struct {
index map[string]subscription
}
// maps a url to a redis Subscribe channel
type subscription struct {
pubsub * redis.PubSubClient
es eventsource.EventSource
ch chan * redis.Message
pubChan string
}
func createSubscription(sh * subscriptionHandler, pubChan * string){
log.Printf("creating channel %s",*pubChan)
pubsub, err := redis.NewTCPClient(":6379","",-1).PubSubClient()
haltOnErr(err)
ch, err := pubsub.Subscribe(*pubChan)
haltOnErr(err)
es := eventsource.New(nil)
sh.index[*pubChan] = subscription{pubsub, es, ch, *pubChan}
}
// listen for published events and send to the EventSource
func listen(index subscription){
for {
msg := <-index.ch
index.es.SendMessage(msg.Message, "", "")
log.Printf("message has been sent on %s (consumers: %d)", index.pubChan, index.es.ConsumersCount())
}
}
func (sh *subscriptionHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request){
pubChan := req.URL.Path[1:]
_, ok := sh.index[pubChan]
if ! ok {
createSubscription(sh, &pubChan)
defer sh.index[pubChan].pubsub.Close()
defer sh.index[pubChan].es.Close()
}
log.Printf("subscribed to %s",pubChan)
go listen(sh.index[pubChan])
sh.index[pubChan].es.ServeHTTP(resp,req)
}
func main() {
streamer := new(subscriptionHandler)
streamer.index = make(map[string]subscription)
http.Handle("/events/", streamer)
err := http.ListenAndServe(":8080", nil)
haltOnErr(err)
}
// https://github.com/Yaffle/EventSource
function listen(channel){
var source;
if (!!window.EventSource) {
source = new EventSource('/events/'+channel);
} else {
// Result to xhr polling :(
}
source.addEventListener('message', function(e) {
console.log(e.data);
}, false);
source.addEventListener('open', function(e) {
console.log("opened channel on "+channel)
}, false);
source.addEventListener('error', function(e) {
if (e.readyState == EventSource.CLOSED) {
console.log("closed channel on "+channel)
}
}, false);
}
@druska

This comment has been minimized.

Copy link

commented Dec 6, 2013

There is no need to pass immutable strings as pointers. You can also use redis to handle the subscriptions instead of a global variable.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.