Skip to content

Instantly share code, notes, and snippets.

@Jerry0420
Last active May 2, 2022 19:37
Show Gist options
  • Save Jerry0420/b9bcff824c43c5bc5ab9ba4da1cfbd34 to your computer and use it in GitHub Desktop.
Save Jerry0420/b9bcff824c43c5bc5ab9ba4da1cfbd34 to your computer and use it in GitHub Desktop.
A Pub-Sub tool written in Golang.
  • Thread-Safe
  • No need for any dependency.
package main
import (
"sync"
"log"
)
type Event map[string]interface{}
type Broker struct {
consumers map[string]map[chan Event]bool
logger *log.Logger
sync.Mutex
}
func NewBroker(logger *log.Logger) *Broker {
return &Broker{
consumers: make(map[string]map[chan Event]bool),
logger: logger,
}
}
func (broker *Broker) Subscribe(topicName string) chan Event {
broker.Lock()
defer broker.Unlock()
if topicName == "" {
broker.logger.Fatalln("topic name can't be empty!")
}
consumerChan := make(chan Event)
consumerChans, ok := broker.consumers[topicName]
if !ok {
broker.consumers[topicName] = map[chan Event]bool{consumerChan: true}
} else {
consumerChans[consumerChan] = true
}
return consumerChan
}
func (broker *Broker) UnsubscribeConsumer(topicName string, consumerChan chan Event) {
broker.Lock()
defer broker.Unlock()
close(consumerChan)
delete(broker.consumers[topicName], consumerChan)
}
func (broker *Broker) UnsubscribeTopic(topicName string) {
broker.Lock()
defer broker.Unlock()
for consumerChan, _ := range broker.consumers[topicName] {
close(consumerChan)
}
delete(broker.consumers, topicName)
}
func (broker *Broker) CloseAll() {
broker.Lock()
defer broker.Unlock()
for _, consumerChans := range broker.consumers {
for consumerChan, _ := range consumerChans {
close(consumerChan)
}
}
}
func (broker *Broker) Publish(topicName string, event Event) {
broker.Lock()
defer broker.Unlock()
for consumerChan, _ := range broker.consumers[topicName] {
// broadcasting...
consumerChan <- event
}
}
package main
import (
"fmt"
"log"
"net/http"
)
func sub(broker *Broker) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Access-Control-Allow-Origin", "*")
w.Header().Set("Access-Control-Allow-Headers", "Content-Type")
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
flusher, ok := w.(http.Flusher)
if !ok {
fmt.Fprintln(w, "sse not support.")
return
}
consumerChan := broker.Subscribe("some-topic-name") // topic name
defer broker.UnsubscribeConsumer("some-topic-name", consumerChan)
for {
select {
case event := <-consumerChan: // event is the received data.
// event = map[string]interface{}{"hello": "world"}
if event["hello"].(string) == "world" {
var flushedData bytes.Buffer
json.NewEncoder(&flushedData).Encode(event)
fmt.Fprintf(w, "data: %v\n\n", flushedData.String())
flusher.Flush()
}
case <-r.Context().Done():
return
}
}
})
}
func pub(broker *Broker) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
go broker.Publish(
"some-topic-name",
map[string]interface{}{"hello": "world"}, //Any data can be sent to the broker.
)
fmt.Fprintln(w, "This is pub.")
})
}
func main() {
logger := log.Default()
broker := NewBroker(logger)
defer broker.CloseAll()
mux := http.NewServeMux()
mux.Handle("/sub", sub(broker))
mux.Handle("/pub", pub(broker))
log.Fatal(http.ListenAndServe("0.0.0.0:8080", mux))
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment