Skip to content

Instantly share code, notes, and snippets.

@elianka
Last active March 22, 2017 02:03
Show Gist options
  • Save elianka/1b54203f581665f2ad5bfd0610183923 to your computer and use it in GitHub Desktop.
Save elianka/1b54203f581665f2ad5bfd0610183923 to your computer and use it in GitHub Desktop.
marathon sse subscribe
package utils
import (
"encoding/base64" //for marathon auth
)
// encrypts the str with base64
func Base64Encode(str string) string {
return base64.StdEncoding.EncodeToString([]byte(str))
}
// decrypts the str with base64
func Base64Decode(str string) (string, error) {
b, err := base64.StdEncoding.DecodeString(str)
return string(b), err
}
package main
import (
"fmt"
_ "net/url"
//"time"
//"io/ioutil"
"bufio"
"bytes"
"net/http"
"github.com/umesos/utils"
)
var (
headerData = []byte("data:")
headerEvent = []byte("event:")
)
func httpGetMarathonEventSSE(url string, auth string, ch eventChannel) (error) {
//fmt.Println(">>>>>>>>"+data)
//t := time.Duration(TIMEOUT) * time.Second
client := &http.Client{}
req, _ := http.NewRequest("GET", url, nil)
authbase64 := "Basic "+utils.Base64Encode(auth)
req.Header.Set("Authorization", authbase64)
req.Header.Set("Accept", "text/event-stream")
req.Header.Set("Connection", "keep-alive")
resp, err := client.Do(req)
if err == nil {
defer resp.Body.Close() //elimite the situation that err != nil
reader := bufio.NewReader(resp.Body)
for {
// Read each new line and process the type of event
data, err2 := reader.ReadBytes('\n')
if err2 == nil {
//ignore empty line and event line
if (data[0] == 0xd && data[1] == 0xa) || (bytes.HasPrefix(data, headerEvent)){
continue
}
//TODO, add more process functions
//line := append(event, data...)
if bytes.Contains(data, []byte("\"eventType\":\"event_stream_attached")) {
ch["subscribe_event"] <- data
} else if bytes.Contains(data, []byte("\"eventType\":\"status_update_event")) {
ch["status_update_event"] <- data
} else if bytes.Contains(data, []byte("\"eventType\":\"deployment_")) {
//ch["deployment"] <- data
} else {
//ch["unknown"] <- data
}
} else {
fmt.Println("receive error. close channel.")
//close(ch)
return err
}
}
fmt.Println("connect closed.")
return err //never reach here
} else {
fmt.Println("HTTP request error. close channel.")
//close(ch)
return err
}
}
type eventChannel map[string](chan []byte)
func main(){
url := "http://192.168.153.216:8080/v2/events"
auth := "xxx:xxx"
//events1 := make(chan []byte)
ch := make(eventChannel)
ch["status_update_event"] = make(chan []byte)
ch["subscribe_event"] = make(chan []byte)
ch["deployment"] = make(chan []byte)
ch["unknown"] = make(chan []byte)
go httpGetMarathonEventSSE(url, auth, ch)
//receive events
for {
var msg []byte
var ok bool
select {
case msg, ok =<-ch["status_update_event"] :
if ok == true {
fmt.Printf("receive events: status_update_event\n%s", string(msg))
}
case msg, ok =<-ch["subscribe_event"] :
if ok == true {
fmt.Printf("receive events: subscribe_event\n%s", string(msg))
}
case msg, ok =<-ch["deployment"] :
if ok == true {
fmt.Printf("receive events: deployment_event\n%s", string(msg))
}
case msg, ok =<-ch["unknown"] :
if ok == true {
fmt.Printf("receive events: unknown\n%s", string(msg))
}
default :
continue
}
}
//fmt.Println(string(rtn), err)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment