Skip to content

Instantly share code, notes, and snippets.

@horsfallnathan
Created July 16, 2022 11:16
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save horsfallnathan/e0acbcb20c69c9cde53cf20fd56c3ca8 to your computer and use it in GitHub Desktop.
Save horsfallnathan/e0acbcb20c69c9cde53cf20fd56c3ca8 to your computer and use it in GitHub Desktop.
A pub/sub implementation in golang
package main
import (
"context"
"fmt"
"math/rand"
"time"
)
type Subscriber struct {
Name string
ID int
Topics []string
}
func (c *Subscriber) Receive(message string) {
fmt.Printf("[%s]%s received message: %s\n", time.Now().Format("2006/01/02 15:04:05"), c.Name, message)
}
type Broker struct {
Subscribers map[int]*Subscriber
TopicMap map[string][]int
}
func NewBroker(topics []string) *Broker {
subscribers := make(map[int]*Subscriber)
topicMap := make(map[string][]int)
for _, topic := range topics {
var a []int
topicMap[topic] = a
}
return &Broker{TopicMap: topicMap, Subscribers: subscribers}
}
func (b *Broker) Register(s Subscriber) {
// Add the subscriber to the subscriberId-subscriber map
b.Subscribers[s.ID] = &s
// Subscribe the subscriber to each topic
for _, topic := range s.Topics {
b.TopicMap[topic] = append(b.TopicMap[topic], s.ID)
}
}
func (b *Broker) Broadcast(topic string, message string) {
// Get the list of subscribers subscribed to the topic
subscriberIds := b.TopicMap[topic]
fmt.Println(subscriberIds)
// Send the message to each subscriber
for _, subscriberId := range subscriberIds {
b.Subscribers[subscriberId].Receive(message)
}
}
// https://stackoverflow.com/a/65607935/12227177
func randomString(length int) string {
rand.Seed(time.Now().UnixNano())
b := make([]byte, length)
rand.Read(b)
return fmt.Sprintf("%x", b)[:length]
}
type Publisher struct {
broker *Broker
}
func (p *Publisher) Publish(topic string, message string) {
// Send the message to each topic
p.broker.Broadcast(topic, fmt.Sprintf("Topic: %s, Message: %s", topic, message))
}
func main() {
topics := []string{"sports", "politics", "religion", "art"}
// manually instantiate subscribers
subscribers := []Subscriber{
{
Name: "Subscriber0",
ID: 0,
Topics: []string{"sports", "politics"},
},
{
Name: "Subscriber1",
ID: 1,
Topics: []string{"religion", "politics", "sports"},
},
{
Name: "Subscriber2",
ID: 2,
Topics: []string{"art"},
},
}
// create a new broker and register subscribers
newsBroker := NewBroker(topics)
for _, subscriber := range subscribers {
newsBroker.Register(subscriber)
}
// create a new publisher
newsPublisher := Publisher{broker: newsBroker}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
for i, topic := range topics {
go func(ctx context.Context, topic string, idx int) {
for {
select {
case <-ctx.Done():
return
default:
ticker := time.NewTicker(time.Duration(idx*5) * time.Second)
for _ = range ticker.C {
newsPublisher.Publish(topic, randomString(10))
}
}
}
}(ctx, topic, i+1)
}
time.Sleep(time.Second * 30)
cancel() // Stop goroutine
fmt.Println("Quitting")
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment