Skip to content

Instantly share code, notes, and snippets.

@mcculloughsean
Last active August 29, 2015 14:01
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mcculloughsean/ad6b97030e98cca3c113 to your computer and use it in GitHub Desktop.
Save mcculloughsean/ad6b97030e98cca3c113 to your computer and use it in GitHub Desktop.
Implementing an event bus proxy in node and go

Event Bus Proxy

Create a server that handles periodic POST requests from another service.

Clients can connect to this server and poll for events using Transfer-Encoding: chunked. Events that are POSTed to the server are returned to the polling clients as individual chunks.

The Node implementation works as expected.

I can't figure out how to get the Go implementation to work correctly. The response is closed after the goroutine declaration finishes. I think this will work if I can figure out how to defer closing the connection, but there's no direct writable interface on the ResponseWriter.

package main
import (
"container/list"
"fmt"
"io/ioutil"
"net/http"
"os"
)
var marathonBaseUrl = os.Getenv("MARATHON_BASE")
var chans list.List
func main() {
if marathonBaseUrl == "" {
marathonBaseUrl = "http://localhost:8080/v2"
}
http.HandleFunc("/", eventBusRecievingHandler)
http.HandleFunc("/poll", eventBusSendHandler)
http.ListenAndServe(":8001", nil)
}
func eventBusRecievingHandler(w http.ResponseWriter, r *http.Request) {
message, err := ioutil.ReadAll(r.Body)
fmt.Println("Request: ", r)
if err != nil {
fmt.Println("Got err", err)
fmt.Fprintf(w, "%s", err)
}
for e := chans.Front(); e != nil; e = e.Next() {
e.Value.(chan []byte) <- message
}
fmt.Println("Writing line", string(message))
fmt.Fprintf(w, "%s", message)
}
func eventBusSendHandler(w http.ResponseWriter, r *http.Request) {
fmt.Printf("writer %#v", w)
for k, v := range r.Header {
for _, v2 := range v {
fmt.Printf("%s: %s\n", k, v2)
}
}
w.(http.Flusher).Flush()
ch := make(chan []byte)
chanElement := chans.PushBack(ch)
var message []byte
_, statusErr := w.Write([]byte("{status: \"ok\"}"))
w.(http.Flusher).Flush()
if statusErr != nil {
fmt.Printf("Can't write to socket, removing channel")
chans.Remove(chanElement)
close(ch)
}
for message = range ch {
fmt.Printf("Writing line %s to channel %s\n\n", string(message), ch)
_, err := w.Write(message)
w.(http.Flusher).Flush()
if err != nil {
fmt.Printf("Can't write to socket, removing channel")
chans.Remove(chanElement)
close(ch)
}
}
}
'use strict';
var http = require('http');
var concat = require('concat-stream');
var without = require('lodash').without;
function channelBroadcaster (){
var channels = [];
function handleEventBusMessage (req, res) {
console.log("Handle Message");
var write = concat(function(data) {
console.log("Got body", JSON.parse(data));
res.writeHead(200, {'Content-Type': 'text/plain'});
res.end('okay');
channels.forEach(function (chan) {
chan.write(data + '\n');
});
});
req.pipe(write);
}
function handleEventBusBroadcast (req, res) {
console.log("Handle broadcast");
channels.push(res);
res.on('end', function () {
channels = without(channels, res);
});
res.on('error', function () {
channels = without(channels, res);
});
res.write('{status: "ok"}\n');
}
return {
handleEventBusBroadcast: handleEventBusBroadcast,
handleEventBusMessage: handleEventBusMessage
};
}
var i = channelBroadcaster();
var server = http.createServer(function (req, res) {
if (req.method === "POST") {
i.handleEventBusMessage(req,res);
} else {
i.handleEventBusBroadcast(req,res);
}
});
server.listen(9000);
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment