Created
July 27, 2020 08:44
-
-
Save mirzaakhena/0936851fd2812c74da74940eaab746d7 to your computer and use it in GitHub Desktop.
Server Sent Events (SSE) Golang Server implementation
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"fmt" | |
"net/http" | |
"sync" | |
"github.com/gin-gonic/gin" | |
) | |
// NotificationCenter ... | |
type NotificationCenter struct { | |
subscriberMessageChannelsID map[string]chan interface{} | |
subscribersMu *sync.Mutex | |
} | |
// NewNotificationCenter ... | |
func NewNotificationCenter() *NotificationCenter { | |
return &NotificationCenter{ | |
subscribersMu: &sync.Mutex{}, | |
subscriberMessageChannelsID: make(map[string]chan interface{}), | |
} | |
} | |
// WaitForMessage will blocking the process until returning the message | |
func (nc *NotificationCenter) WaitForMessage(id string) <-chan interface{} { | |
return nc.subscriberMessageChannelsID[id] | |
} | |
// Subscribe ... | |
func (nc *NotificationCenter) Subscribe(id string) error { | |
fmt.Printf(">>>> subscribe %s\n", id) | |
nc.subscribersMu.Lock() | |
defer nc.subscribersMu.Unlock() | |
if _, exist := nc.subscriberMessageChannelsID[id]; exist { | |
return fmt.Errorf("outlet %s already registered", id) | |
} | |
nc.subscriberMessageChannelsID[id] = make(chan interface{}) | |
return nil | |
} | |
// Unsubscribe ... | |
func (nc *NotificationCenter) Unsubscribe(id string) error { | |
fmt.Printf(">>>> unsubscribe %s\n", id) | |
nc.subscribersMu.Lock() | |
defer nc.subscribersMu.Unlock() | |
if _, exist := nc.subscriberMessageChannelsID[id]; !exist { | |
return fmt.Errorf("outlet %s is not registered yet", id) | |
} | |
close(nc.subscriberMessageChannelsID[id]) | |
delete(nc.subscriberMessageChannelsID, id) | |
return nil | |
} | |
// Notify ... | |
func (nc *NotificationCenter) Notify(id string, message interface{}) error { | |
fmt.Printf(">>>> send message to %s\n", id) | |
nc.subscribersMu.Lock() | |
defer nc.subscribersMu.Unlock() | |
if _, exist := nc.subscriberMessageChannelsID[id]; !exist { | |
return fmt.Errorf("outlet %s is not registered", id) | |
} | |
nc.subscriberMessageChannelsID[id] <- message | |
return nil | |
} | |
func handleSSE(nc *NotificationCenter) gin.HandlerFunc { | |
return func(ctx *gin.Context) { | |
id := ctx.Param("id") | |
// subscribe the id and channel | |
if err := nc.Subscribe(id); err != nil { | |
ctx.JSON(http.StatusBadRequest, map[string]interface{}{"message": err.Error()}) | |
return | |
} | |
// unsubscribe if exit from this method | |
defer func() { | |
if err := nc.Unsubscribe(id); err != nil { | |
ctx.JSON(http.StatusBadRequest, map[string]interface{}{"message": err.Error()}) | |
return | |
} | |
message := fmt.Sprintf("id %s close connection", id) | |
ctx.JSON(http.StatusBadRequest, map[string]interface{}{"message": message}) | |
}() | |
// forever loop for listening message | |
for { | |
select { | |
case message := <-nc.WaitForMessage(id): | |
ctx.SSEvent("message", message) | |
ctx.Writer.Flush() | |
case <-ctx.Request.Context().Done(): | |
return | |
} | |
} | |
} | |
} | |
func messageHandler(nc *NotificationCenter) gin.HandlerFunc { | |
return func(ctx *gin.Context) { | |
id := ctx.Param("id") | |
message := fmt.Sprintf("Hello %s", id) | |
if err := nc.Notify(id, message); err != nil { | |
ctx.JSON(http.StatusBadRequest, map[string]interface{}{"message": err.Error()}) | |
return | |
} | |
ctx.JSON(http.StatusOK, map[string]interface{}{"message": fmt.Sprintf("message send to %s", ctx.Param("id"))}) | |
} | |
} | |
func main() { | |
r := gin.Default() | |
nc := NewNotificationCenter() | |
r.GET("/message/:id", messageHandler(nc)) | |
r.GET("/handshake/:id", handleSSE(nc)) | |
r.Run(":3000") | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment