A pub/sub implementation in golang
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"context" | |
"fmt" | |
"math/rand" | |
"time" | |
) | |
type Subscriber struct { | |
Name string | |
ID int | |
Topics []string | |
} | |
func (c *Subscriber) Receive(message string) { | |
fmt.Printf("[%s]%s received message: %s\n", time.Now().Format("2006/01/02 15:04:05"), c.Name, message) | |
} | |
type Broker struct { | |
Subscribers map[int]*Subscriber | |
TopicMap map[string][]int | |
} | |
func NewBroker(topics []string) *Broker { | |
subscribers := make(map[int]*Subscriber) | |
topicMap := make(map[string][]int) | |
for _, topic := range topics { | |
var a []int | |
topicMap[topic] = a | |
} | |
return &Broker{TopicMap: topicMap, Subscribers: subscribers} | |
} | |
func (b *Broker) Register(s Subscriber) { | |
// Add the subscriber to the subscriberId-subscriber map | |
b.Subscribers[s.ID] = &s | |
// Subscribe the subscriber to each topic | |
for _, topic := range s.Topics { | |
b.TopicMap[topic] = append(b.TopicMap[topic], s.ID) | |
} | |
} | |
func (b *Broker) Broadcast(topic string, message string) { | |
// Get the list of subscribers subscribed to the topic | |
subscriberIds := b.TopicMap[topic] | |
fmt.Println(subscriberIds) | |
// Send the message to each subscriber | |
for _, subscriberId := range subscriberIds { | |
b.Subscribers[subscriberId].Receive(message) | |
} | |
} | |
// https://stackoverflow.com/a/65607935/12227177 | |
func randomString(length int) string { | |
rand.Seed(time.Now().UnixNano()) | |
b := make([]byte, length) | |
rand.Read(b) | |
return fmt.Sprintf("%x", b)[:length] | |
} | |
type Publisher struct { | |
broker *Broker | |
} | |
func (p *Publisher) Publish(topic string, message string) { | |
// Send the message to each topic | |
p.broker.Broadcast(topic, fmt.Sprintf("Topic: %s, Message: %s", topic, message)) | |
} | |
func main() { | |
topics := []string{"sports", "politics", "religion", "art"} | |
// manually instantiate subscribers | |
subscribers := []Subscriber{ | |
{ | |
Name: "Subscriber0", | |
ID: 0, | |
Topics: []string{"sports", "politics"}, | |
}, | |
{ | |
Name: "Subscriber1", | |
ID: 1, | |
Topics: []string{"religion", "politics", "sports"}, | |
}, | |
{ | |
Name: "Subscriber2", | |
ID: 2, | |
Topics: []string{"art"}, | |
}, | |
} | |
// create a new broker and register subscribers | |
newsBroker := NewBroker(topics) | |
for _, subscriber := range subscribers { | |
newsBroker.Register(subscriber) | |
} | |
// create a new publisher | |
newsPublisher := Publisher{broker: newsBroker} | |
ctx, cancel := context.WithCancel(context.Background()) | |
defer cancel() | |
for i, topic := range topics { | |
go func(ctx context.Context, topic string, idx int) { | |
for { | |
select { | |
case <-ctx.Done(): | |
return | |
default: | |
ticker := time.NewTicker(time.Duration(idx*5) * time.Second) | |
for _ = range ticker.C { | |
newsPublisher.Publish(topic, randomString(10)) | |
} | |
} | |
} | |
}(ctx, topic, i+1) | |
} | |
time.Sleep(time.Second * 30) | |
cancel() // Stop goroutine | |
fmt.Println("Quitting") | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment