Skip to content

Instantly share code, notes, and snippets.

@mirzaakhena
Created July 27, 2020 08:44
Show Gist options
  • Save mirzaakhena/0936851fd2812c74da74940eaab746d7 to your computer and use it in GitHub Desktop.
Save mirzaakhena/0936851fd2812c74da74940eaab746d7 to your computer and use it in GitHub Desktop.
Server Sent Events (SSE) Golang Server implementation
package main
import (
"fmt"
"net/http"
"sync"
"github.com/gin-gonic/gin"
)
// NotificationCenter ...
type NotificationCenter struct {
subscriberMessageChannelsID map[string]chan interface{}
subscribersMu *sync.Mutex
}
// NewNotificationCenter ...
func NewNotificationCenter() *NotificationCenter {
return &NotificationCenter{
subscribersMu: &sync.Mutex{},
subscriberMessageChannelsID: make(map[string]chan interface{}),
}
}
// WaitForMessage will blocking the process until returning the message
func (nc *NotificationCenter) WaitForMessage(id string) <-chan interface{} {
return nc.subscriberMessageChannelsID[id]
}
// Subscribe ...
func (nc *NotificationCenter) Subscribe(id string) error {
fmt.Printf(">>>> subscribe %s\n", id)
nc.subscribersMu.Lock()
defer nc.subscribersMu.Unlock()
if _, exist := nc.subscriberMessageChannelsID[id]; exist {
return fmt.Errorf("outlet %s already registered", id)
}
nc.subscriberMessageChannelsID[id] = make(chan interface{})
return nil
}
// Unsubscribe ...
func (nc *NotificationCenter) Unsubscribe(id string) error {
fmt.Printf(">>>> unsubscribe %s\n", id)
nc.subscribersMu.Lock()
defer nc.subscribersMu.Unlock()
if _, exist := nc.subscriberMessageChannelsID[id]; !exist {
return fmt.Errorf("outlet %s is not registered yet", id)
}
close(nc.subscriberMessageChannelsID[id])
delete(nc.subscriberMessageChannelsID, id)
return nil
}
// Notify ...
func (nc *NotificationCenter) Notify(id string, message interface{}) error {
fmt.Printf(">>>> send message to %s\n", id)
nc.subscribersMu.Lock()
defer nc.subscribersMu.Unlock()
if _, exist := nc.subscriberMessageChannelsID[id]; !exist {
return fmt.Errorf("outlet %s is not registered", id)
}
nc.subscriberMessageChannelsID[id] <- message
return nil
}
func handleSSE(nc *NotificationCenter) gin.HandlerFunc {
return func(ctx *gin.Context) {
id := ctx.Param("id")
// subscribe the id and channel
if err := nc.Subscribe(id); err != nil {
ctx.JSON(http.StatusBadRequest, map[string]interface{}{"message": err.Error()})
return
}
// unsubscribe if exit from this method
defer func() {
if err := nc.Unsubscribe(id); err != nil {
ctx.JSON(http.StatusBadRequest, map[string]interface{}{"message": err.Error()})
return
}
message := fmt.Sprintf("id %s close connection", id)
ctx.JSON(http.StatusBadRequest, map[string]interface{}{"message": message})
}()
// forever loop for listening message
for {
select {
case message := <-nc.WaitForMessage(id):
ctx.SSEvent("message", message)
ctx.Writer.Flush()
case <-ctx.Request.Context().Done():
return
}
}
}
}
func messageHandler(nc *NotificationCenter) gin.HandlerFunc {
return func(ctx *gin.Context) {
id := ctx.Param("id")
message := fmt.Sprintf("Hello %s", id)
if err := nc.Notify(id, message); err != nil {
ctx.JSON(http.StatusBadRequest, map[string]interface{}{"message": err.Error()})
return
}
ctx.JSON(http.StatusOK, map[string]interface{}{"message": fmt.Sprintf("message send to %s", ctx.Param("id"))})
}
}
func main() {
r := gin.Default()
nc := NewNotificationCenter()
r.GET("/message/:id", messageHandler(nc))
r.GET("/handshake/:id", handleSSE(nc))
r.Run(":3000")
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment