- Thread-Safe
- No need for any dependency.
Last active
May 2, 2022 19:37
-
-
Save Jerry0420/b9bcff824c43c5bc5ab9ba4da1cfbd34 to your computer and use it in GitHub Desktop.
A Pub-Sub tool written in Golang.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"sync" | |
"log" | |
) | |
type Event map[string]interface{} | |
type Broker struct { | |
consumers map[string]map[chan Event]bool | |
logger *log.Logger | |
sync.Mutex | |
} | |
func NewBroker(logger *log.Logger) *Broker { | |
return &Broker{ | |
consumers: make(map[string]map[chan Event]bool), | |
logger: logger, | |
} | |
} | |
func (broker *Broker) Subscribe(topicName string) chan Event { | |
broker.Lock() | |
defer broker.Unlock() | |
if topicName == "" { | |
broker.logger.Fatalln("topic name can't be empty!") | |
} | |
consumerChan := make(chan Event) | |
consumerChans, ok := broker.consumers[topicName] | |
if !ok { | |
broker.consumers[topicName] = map[chan Event]bool{consumerChan: true} | |
} else { | |
consumerChans[consumerChan] = true | |
} | |
return consumerChan | |
} | |
func (broker *Broker) UnsubscribeConsumer(topicName string, consumerChan chan Event) { | |
broker.Lock() | |
defer broker.Unlock() | |
close(consumerChan) | |
delete(broker.consumers[topicName], consumerChan) | |
} | |
func (broker *Broker) UnsubscribeTopic(topicName string) { | |
broker.Lock() | |
defer broker.Unlock() | |
for consumerChan, _ := range broker.consumers[topicName] { | |
close(consumerChan) | |
} | |
delete(broker.consumers, topicName) | |
} | |
func (broker *Broker) CloseAll() { | |
broker.Lock() | |
defer broker.Unlock() | |
for _, consumerChans := range broker.consumers { | |
for consumerChan, _ := range consumerChans { | |
close(consumerChan) | |
} | |
} | |
} | |
func (broker *Broker) Publish(topicName string, event Event) { | |
broker.Lock() | |
defer broker.Unlock() | |
for consumerChan, _ := range broker.consumers[topicName] { | |
// broadcasting... | |
consumerChan <- event | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"fmt" | |
"log" | |
"net/http" | |
) | |
func sub(broker *Broker) http.Handler { | |
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { | |
w.Header().Set("Access-Control-Allow-Origin", "*") | |
w.Header().Set("Access-Control-Allow-Headers", "Content-Type") | |
w.Header().Set("Content-Type", "text/event-stream") | |
w.Header().Set("Cache-Control", "no-cache") | |
w.Header().Set("Connection", "keep-alive") | |
flusher, ok := w.(http.Flusher) | |
if !ok { | |
fmt.Fprintln(w, "sse not support.") | |
return | |
} | |
consumerChan := broker.Subscribe("some-topic-name") // topic name | |
defer broker.UnsubscribeConsumer("some-topic-name", consumerChan) | |
for { | |
select { | |
case event := <-consumerChan: // event is the received data. | |
// event = map[string]interface{}{"hello": "world"} | |
if event["hello"].(string) == "world" { | |
var flushedData bytes.Buffer | |
json.NewEncoder(&flushedData).Encode(event) | |
fmt.Fprintf(w, "data: %v\n\n", flushedData.String()) | |
flusher.Flush() | |
} | |
case <-r.Context().Done(): | |
return | |
} | |
} | |
}) | |
} | |
func pub(broker *Broker) http.Handler { | |
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { | |
go broker.Publish( | |
"some-topic-name", | |
map[string]interface{}{"hello": "world"}, //Any data can be sent to the broker. | |
) | |
fmt.Fprintln(w, "This is pub.") | |
}) | |
} | |
func main() { | |
logger := log.Default() | |
broker := NewBroker(logger) | |
defer broker.CloseAll() | |
mux := http.NewServeMux() | |
mux.Handle("/sub", sub(broker)) | |
mux.Handle("/pub", pub(broker)) | |
log.Fatal(http.ListenAndServe("0.0.0.0:8080", mux)) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment