Skip to content

Instantly share code, notes, and snippets.

@aktau
Created March 20, 2014 12:57
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save aktau/9663194 to your computer and use it in GitHub Desktop.
Save aktau/9663194 to your computer and use it in GitHub Desktop.
Server-Sent-Events client in Go
package main
import (
"bufio"
"bytes"
"crypto/tls"
"encoding/json"
"fmt"
"log"
"net"
"net/http"
)
/* implements the Backend interface, with WebSockets as the underlying
* transports. */
type SseClient struct {
inUri string
outUri string
httpclient *http.Client
conn net.Conn
req *http.Request
resp *http.Response
reader *bufio.Reader
buffer *bytes.Buffer
dec *json.Decoder
}
func NewSseClient(remote string, port int, inLoc string, outLoc string, tlsConf *tls.Config) (*SseClient, error) {
protocol := "https"
if tlsConf == nil {
protocol = "http"
}
var conn net.Conn
/* we supply a custom Dial function to the http.Transport so we can
* intercept the underlying TCP connection and close it for real. See the
* Close() method for an explanation. */
tr := &http.Transport{
TLSClientConfig: tlsConf,
DisableKeepAlives: true,
Dial: func(netw, addr string) (net.Conn, error) {
c, err := net.Dial(netw, addr)
if err != nil {
return nil, err
}
/* if TCP conn, try to set linger */
tcpConn, ok := c.(*net.TCPConn)
if ok {
tcpConn.SetLinger(0)
}
/* intercept the net.Conn, keep a local reference */
conn = c
return c, nil
},
}
httpclient := &http.Client{Transport: tr}
/* open a request, can't use httpclient.Get because we need the http.Request
* so we can close the connection later */
base := fmt.Sprintf("%s://%s:%d", protocol, remote, port)
outUri := base + "/" + outLoc
inUri := base + "/" + inLoc
req, err := http.NewRequest("GET", inUri, nil)
if err != nil {
return nil, err
}
/* open the streaming connecting */
resp, err := httpclient.Do(req)
if err != nil {
return nil, err
}
log.Println("sse: connected to", inUri)
reader := bufio.NewReader(resp.Body)
var buffer bytes.Buffer
client := &SseClient{
inUri: inUri,
outUri: outUri,
httpclient: httpclient,
conn: conn,
req: req,
resp: resp,
reader: reader,
buffer: &buffer,
}
client.dec = json.NewDecoder(client.buffer)
return client, nil
}
/* close the streaming GET */
func (c *SseClient) Close() error {
/* this part tries to close the underlying TCP connection, which we
* intercepted by replacing the standard Dial function while constructing
* the http.Transport. This seems to be the only reliable way of closing
* the connection even if the body hasn't been fully read yet. Thanks to
* @mynameisfiber on github for the tip:
* https://gist.github.com/mynameisfiber/2853066 */
log.Println("sse/close: closing TCP connection")
if c.conn != nil {
err := c.conn.Close()
if err != nil {
log.Println("sse/close: error, could NOT close TCP connection")
}
}
/* note that the following doesn't seem to really close the body if
* there's still bytes to be read, so this is just a formal thing. */
log.Println("sse/close: closing body")
err := c.resp.Body.Close()
if err != nil {
return err
}
tr, ok := c.httpclient.Transport.(*http.Transport)
if !ok {
log.Println("sse/close: could not cast httpclient transport")
} else {
log.Println("sse/close: second cancel")
tr.CancelRequest(c.req)
}
return nil
}
func (c *SseClient) ReceiveJson(v interface{}) error {
/* if we return from an incomplete message, we better reset. If the
* message was complete, this isn't necessary, but it doesn't hurt anyway */
defer c.buffer.Reset()
log.Println("sse/recvjson: starting to receive")
/* a SSE message is delimited by \n\n, it is of the
* form:
*
* id: 12345\n
* event: quote\n
* data: GOOG\n
* data: 556\n\n
*
* both id and event are optional, we're only
* interested in data anyways */
for {
/* check for end-of-message (i.e.: '\n') */
b, err := c.reader.ReadByte()
if err != nil {
return err
}
if b == '\n' {
log.Println("sse/recvjson: end of message received")
break
}
/* if it wasn't a newline, unread it */
err = c.reader.UnreadByte()
if err != nil {
return err
}
/* ReadSlice does not create a copy, unlike ReadBytes */
prefix, err := c.reader.ReadSlice(' ')
if err != nil {
return err
}
log.Println("sse/recvjson: succesfully read SSE prefix:",
string(prefix))
line, err := c.reader.ReadSlice('\n')
if err != nil {
return err
}
log.Println("sse/recvjson: succesfully read SSE payload:",
string(line))
if len(prefix) > 4 {
cmd := string(prefix[:len(prefix)-2])
switch cmd {
case "data":
log.Println("sse/recvjson: cmd was data, writing to buffer")
_, err = c.buffer.Write(line)
if err != nil {
return err
}
log.Println("sse/recvjson: cmd was data, buffer written")
case "id":
/* ignore */
case "event":
/* ignore */
default:
log.Println("sse/recvjson: error, unknown cmd type,", cmd)
}
} else {
log.Println("sse/recvjson: error, prefix wasn't long enough,",
prefix)
}
}
log.Println("sse/recvjson: returning decoded message")
return c.dec.Decode(v)
}
func (c *SseClient) SendJson(v interface{}) error {
/* TODO: convert to more efficient encoder */
b, err := json.Marshal(v)
if err != nil {
return fmt.Errorf("sse/sendjson: error while marshalling %v: %v",
v, err)
}
resp, err := c.httpclient.Post(c.outUri, "application/json",
bytes.NewReader(b))
if err != nil {
return err
}
defer resp.Body.Close()
log.Println("sse/sendjson: sent a POST request to", c.outUri)
/* check for 200 */
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("sse/sendjson: did not get a 200 OK, got a %s instead",
resp.Status)
}
return nil
}
Copy link

ghost commented Mar 22, 2014

From the net/http package, use the Request.Write method and the ReadReponse function to construct your own HTTP client. The code will almost certainly be shorter than this gist.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment