Skip to content

Instantly share code, notes, and snippets.

@exu
Forked from dudochkin-victor/fssse.go
Created November 9, 2021 15:09
Show Gist options
  • Save exu/68604d5c115838ebc98cab88c1884011 to your computer and use it in GitHub Desktop.
Save exu/68604d5c115838ebc98cab88c1884011 to your computer and use it in GitHub Desktop.
fasthttp sse
package main
import (
"bufio"
"fmt"
"log"
"time"
"github.com/valyala/fasthttp"
)
func main() {
h := requestHandler
if err := fasthttp.ListenAndServe(":9004", h); err != nil {
log.Fatalf("Error in ListenAndServe: %s", err)
}
}
func requestHandler(ctx *fasthttp.RequestCtx) {
fmt.Println("HERE")
// ctx.SetContentType("text/event-stream; charset=utf8")
ctx.SetContentType("text/event-stream")
ctx.Response.Header.Set("Cache-Control", "no-cache")
ctx.Response.Header.Set("Connection", "keep-alive")
ctx.Response.Header.Set("Transfer-Encoding", "chunked")
ctx.Response.Header.Set("Access-Control-Allow-Origin", "*")
ctx.Response.Header.Set("Access-Control-Allow-Headers", "Cache-Control")
ctx.Response.Header.Set("Access-Control-Allow-Credentials", "true")
ctx.SetBodyStreamWriter(fasthttp.StreamWriter(func(w *bufio.Writer) {
fmt.Println("WRITER")
var i int
for {
i++
msg := fmt.Sprintf("%d - the time is %v", i, time.Now())
fmt.Fprintf(w, "data: Message: %s\n\n", msg)
fmt.Println(msg)
// Flush the response. This is only possible if
// the repsonse supports streaming.
w.Flush()
time.Sleep(5 * time.Second)
}
}))
}
// added in revel.AddInitEventHandler
// on revel.ENGINE_BEFORE_INITIALIZED event
// revel.AddHTTPMux("/streaming", streamingHandler)
// and not working
var streamingHandler = fasthttp.RequestHandler(func(ctx *fasthttp.RequestCtx) {
fmt.Println("HERE")
// ctx.SetContentType("text/event-stream; charset=utf8")
ctx.SetContentType("text/event-stream")
ctx.Response.Header.Set("Cache-Control", "no-cache")
ctx.Response.Header.Set("Connection", "keep-alive")
ctx.Response.Header.Set("Transfer-Encoding", "chunked")
ctx.Response.Header.Set("Access-Control-Allow-Origin", "*")
ctx.Response.Header.Set("Access-Control-Allow-Headers", "Cache-Control")
ctx.Response.Header.Set("Access-Control-Allow-Credentials", "true")
ctx.SetBodyStreamWriter(fasthttp.StreamWriter(func(w *bufio.Writer) {
fmt.Println("WRITER")
var i int
for {
i++
msg := fmt.Sprintf("%d - the time is %v", i, time.Now())
fmt.Fprintf(w, "data: Message: %s\n\n", msg)
fmt.Println(msg)
// Flush the response. This is only possible if
// the repsonse supports streaming.
w.Flush()
time.Sleep(5 * time.Second)
}
}))
})
var streamingGohttpHandler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
fmt.Println("HERE")
f, ok := w.(http.Flusher)
if !ok {
fmt.Println("Streaming unsupported!", http.StatusInternalServerError)
http.Error(w, "Streaming unsupported!", http.StatusInternalServerError)
return
}
fmt.Println("Streaming supported")
// Set the headers related to event streaming.
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
w.Header().Set("Transfer-Encoding", "chunked")
w.Header().Set("Access-Control-Allow-Origin", "*")
w.Header().Set("Access-Control-Allow-Headers", "Cache-Control")
w.Header().Set("Access-Control-Allow-Credentials", "true")
// can be used easy as standart handler in development mode
// coz it user EventSource which recover connection after close,
for i := 1; i < 100; i++ {
msg := fmt.Sprintf("%d - the time is %v", i, time.Now())
fmt.Fprintf(w, "data: Message: %s\n\n", msg)
fmt.Println(msg)
// Flush the response. This is only possible if
// the repsonse supports streaming.
f.Flush()
time.Sleep(5 * time.Second)
}
})
package models
// i like to use sse as revel.Result something like
// func (ctrl EventSource) Index() revel.Result {
// return models.SSEClient{}
// }
// not working atm
import (
"fmt"
"io"
"net/http"
"time"
"github.com/revel/revel"
)
// SSEResponse is Writer wrapper to hide Closer interface
type SSEResponse struct {
writer io.Writer
}
// Write wrapper
func (sse SSEResponse) Write(p []byte) (n int, err error) {
fmt.Println("SSE WRITE")
return sse.writer.Write(p)
}
// Close wrapper
func (sse SSEResponse) Close() (err error) {
fmt.Println("SSE CLOSE")
return
}
// SSEClient is
type SSEClient struct {
}
// fasthttp.responseBodyWriter
// Apply is
func (handle SSEClient) Apply(req *revel.Request, resp *revel.Response) {
if w, ok := resp.GetWriter().(http.ResponseWriter); ok {
fmt.Printf("SSE SERVE HTTP %T\n", w)
resp.SetWriter(SSEResponse{
writer: w,
})
go func() {
// Make sure that the writer supports flushing.
f, ok := w.(http.Flusher)
if !ok {
fmt.Println("Streaming unsupported!", http.StatusInternalServerError)
http.Error(w, "Streaming unsupported!", http.StatusInternalServerError)
return
}
fmt.Println("Streaming supported")
// Set the headers related to event streaming.
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
w.Header().Set("Transfer-Encoding", "chunked")
w.Header().Set("Access-Control-Allow-Origin", "*")
w.Header().Set("Access-Control-Allow-Headers", "Cache-Control")
w.Header().Set("Access-Control-Allow-Credentials", "true")
// can be used easy as standart handler in development mode
// coz it user EventSource which recover connection after close,
for i := 1; i < 100; i++ {
msg := fmt.Sprintf("%d - the time is %v", i, time.Now())
fmt.Fprintf(w, "data: Message: %s\n\n", msg)
fmt.Println(msg)
// Flush the response. This is only possible if
// the repsonse supports streaming.
f.Flush()
time.Sleep(5 * time.Second)
}
}()
} else {
fmt.Printf("OOPS %T\n", resp.GetWriter())
}
}
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="utf-8">
<title>EventSource Demo</title>
<script src="/js/jquery-2.2.4.min.js"></script>
</head>
<style>
* {
box-sizing: border-box;
}
body {
background-color: #eee;
font: 100%/1.5 "Source Sans Pro", "Helvetica Neue", Helvetica, sans-serif;
padding-top: 2em;
}
.container {
max-width: 720px;
margin: 0 auto;
padding: 20px;
background-color: #fff;
color: #333;
border: 1px solid #ddd;
}
h1 {
margin-top: 0;
}
#messages {
max-height: 480px;
overflow-y: auto;
}
</style>
<script>
$(function() {
var source = new EventSource('/events/astalavistababy/testdrive'); // change to correct endpoint
source.onopen = function (event) {
console.log("eventsource connection open");
};
source.onerror = function(event) {
if (event.target.readyState === 0) {
console.log("reconnecting to eventsource");
} else {
console.log("eventsource error", event);
}
};
source.onmessage = function(event) {
console.log(event.data)
$('<div>', {
text: event.data,
css: {
display: "none"
}
})
.prependTo("#messages")
.show("fast")
;
};
});
</script>
<body>
<div class="container">
<h1>Messages</h1>
<div id="messages"></div>
</div>
</body>
</html>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment