Last active
February 6, 2025 22:57
-
-
Save moshaad7/176bc7f0dea21585651a1f83b44e203a to your computer and use it in GitHub Desktop.
LLD_PUB_SUB
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Directory Structure: | |
└── ./ | |
└── faltu | |
├── main.go | |
├── msg_impl.go | |
├── msg_store_in_mem.go | |
├── msg_store.go | |
├── msg.go | |
├── pubsub_impl.go | |
├── pubsub.go | |
├── subscriber_manager_impl.go | |
├── subscriber_manager.go | |
├── subscriber_sleeping.go | |
├── subscriber.go | |
├── topic_impl.go | |
├── topic_manager_impl.go | |
├── topic_manager.go | |
├── topic_single_partition.go | |
├── topic.go | |
├── worker_pool_impl.go | |
└── worker_pool.go | |
--- | |
File: /faltu/main.go | |
--- | |
package main | |
import ( | |
"fmt" | |
) | |
func main() { | |
fmt.Println("Hello World!") | |
} | |
--- | |
File: /faltu/msg_impl.go | |
--- | |
package main | |
import "fmt" | |
type MessageImpl struct { | |
Key string | |
Value string | |
} | |
func NewMessageImpl(key string, value string) Message { | |
return &MessageImpl{ | |
Key: key, | |
Value: value, | |
} | |
} | |
func (m *MessageImpl) String() string { | |
return fmt.Sprintf("{Key: %v, Value: %v}", m.Key, m.Value) | |
} | |
func (m *MessageImpl) DeepCopy() Message { | |
return &MessageImpl{ | |
Key: m.Key, | |
Value: m.Value, | |
} | |
} | |
--- | |
File: /faltu/msg_store_in_mem.go | |
--- | |
package main | |
import "sync" | |
type InMemMsgStore struct { | |
m sync.RWMutex | |
messages []Message | |
} | |
func newInMemMsgStore() MessageStore { | |
return &InMemMsgStore{ | |
m: sync.RWMutex{}, | |
messages: make([]Message, 0), | |
} | |
} | |
func (s *InMemMsgStore) Append(msg Message) { | |
s.m.Lock() | |
defer s.m.Unlock() | |
s.messages = append(s.messages, msg) | |
} | |
func (s *InMemMsgStore) Offset() int { | |
s.m.Lock() | |
defer s.m.Unlock() | |
return len(s.messages) - 1 | |
} | |
--- | |
File: /faltu/msg_store.go | |
--- | |
package main | |
import "fmt" | |
type MessageStore interface { | |
Offset() int | |
Append(msg Message) | |
} | |
type MsgStoreType string | |
var ( | |
MsgStoreTypeInMem MsgStoreType = "IN_MEM" | |
) | |
func NewMsgStore(msgStoreType MsgStoreType) (MessageStore, error) { | |
switch msgStoreType { | |
case MsgStoreTypeInMem: | |
return newInMemMsgStore(), nil | |
default: | |
return nil, fmt.Errorf("invalid message store type: %v", string(msgStoreType)) | |
} | |
} | |
--- | |
File: /faltu/msg.go | |
--- | |
package main | |
type Message interface { | |
String() string | |
DeepCopy() Message | |
} | |
--- | |
File: /faltu/pubsub_impl.go | |
--- | |
package main | |
import "sync" | |
type PubSubServiceImpl struct { | |
topicsManger TopicsManager | |
subsManager SubscribersManager | |
workerPool WorkerPool | |
} | |
var ( | |
pb PubSubService | |
pbOnce sync.Once | |
) | |
func newPubSubServiceImpl() PubSubService { | |
return &PubSubServiceImpl{ | |
topicsManger: NewTopicsManagerImpl(), | |
subsManager: newSubscribersManagerImpl(), | |
workerPool: newWorkerPoolImpl(10, 100), | |
} | |
} | |
func GetPubSubServiceInstance() PubSubService { | |
pbOnce.Do(func() { | |
pb = newPubSubServiceImpl() | |
}) | |
return pb | |
} | |
func (pb *PubSubServiceImpl) CreateTopic(name string) Topic { | |
return pb.topicsManger.CreateTopic(name) | |
} | |
func (pb *PubSubServiceImpl) notifySubscribers(topicName string, msg Message) { | |
subscribers := pb.subsManager.GetSubs(topicName) | |
for _, subscriber := range subscribers { | |
// Capture subscriber and message for this iteration | |
sub := subscriber | |
m := msg.DeepCopy() // if necessary, so each subscriber gets its own copy | |
pb.workerPool.SubmitWork(func() { | |
sub.Consume(m) | |
}) | |
} | |
} | |
func (pb *PubSubServiceImpl) Publish(topicName string, msg Message) { | |
pb.topicsManger.Publish(topicName, msg) | |
} | |
func (pb *PubSubServiceImpl) Subscribe(subscriber Subscriber, topicName string) { | |
pb.subsManager.Subscribe(topicName, subscriber) | |
} | |
func (pb *PubSubServiceImpl) Unsubscribe(topicName string, subscriber Subscriber) { | |
pb.subsManager.Unsubscribe(topicName, subscriber) | |
} | |
--- | |
File: /faltu/pubsub.go | |
--- | |
package main | |
type PubSubService interface { | |
CreateTopic(name string) Topic | |
// Publish msg to a kafka topic | |
Publish(topicName string, msg Message) | |
// Register Subscriber | |
Subscribe(subscriber Subscriber, topicName string) | |
Unsubscribe(topicName string, subscriber Subscriber) | |
} | |
--- | |
File: /faltu/subscriber_manager_impl.go | |
--- | |
package main | |
import "sync" | |
type SubscribersManagerImpl struct { | |
m sync.RWMutex | |
subscribers map[string][]Subscriber | |
inverseIndex map[string]map[string]int // topicName => subscriberID => Index in subscribers[topicName] list | |
} | |
func newSubscribersManagerImpl() SubscribersManager { | |
return &SubscribersManagerImpl{ | |
m: sync.RWMutex{}, | |
subscribers: make(map[string][]Subscriber), | |
inverseIndex: make(map[string]map[string]int), | |
} | |
} | |
func (s *SubscribersManagerImpl) isSubscribedLOCKED(id, topicName string) bool { | |
if _, found := s.inverseIndex[topicName]; !found { | |
s.inverseIndex[topicName] = make(map[string]int) | |
} else { | |
if _, found := s.inverseIndex[topicName][id]; found { | |
// Already subscribed to topic | |
return true | |
} | |
} | |
return false | |
} | |
func (s *SubscribersManagerImpl) Subscribe(topicName string, subscriber Subscriber) { | |
s.m.Lock() | |
defer s.m.Unlock() | |
if s.isSubscribedLOCKED(subscriber.ID(), topicName) { | |
return | |
} | |
if _, found := s.subscribers[topicName]; !found { | |
s.subscribers[topicName] = make([]Subscriber, 0) | |
} | |
s.subscribers[topicName] = append(s.subscribers[topicName], subscriber) | |
s.inverseIndex[topicName][subscriber.ID()] = len(s.subscribers[topicName]) - 1 | |
} | |
func (s *SubscribersManagerImpl) Unsubscribe(topicName string, subscriber Subscriber) { | |
s.m.Lock() | |
defer s.m.Unlock() | |
if !s.isSubscribedLOCKED(subscriber.ID(), topicName) { | |
return | |
} | |
idx := s.inverseIndex[topicName][subscriber.ID()] | |
s.subscribers[topicName] = append(s.subscribers[topicName][0:idx], s.subscribers[topicName][idx+1:]...) | |
} | |
func (s *SubscribersManagerImpl) GetSubs(topicName string) []Subscriber { | |
s.m.RLock() | |
defer s.m.RUnlock() | |
numSubs := len(s.subscribers[topicName]) | |
rv := make([]Subscriber, 0, numSubs) | |
for _, sub := range s.subscribers[topicName] { | |
rv = append(rv, sub) | |
} | |
return rv | |
} | |
--- | |
File: /faltu/subscriber_manager.go | |
--- | |
package main | |
type SubscribersManager interface { | |
Subscribe(topicName string, subscriber Subscriber) | |
Unsubscribe(topicName string, subscriber Subscriber) | |
GetSubs(topicName string) []Subscriber | |
} | |
--- | |
File: /faltu/subscriber_sleeping.go | |
--- | |
package main | |
import ( | |
"fmt" | |
"time" | |
) | |
type SleepingSubscriber struct { | |
Id string | |
SleepDuration time.Duration | |
} | |
func NewSleepingSubscriber(id string, duration time.Duration) *SleepingSubscriber { | |
return &SleepingSubscriber{ | |
Id: id, | |
SleepDuration: duration, | |
} | |
} | |
func (s *SleepingSubscriber) ID() string { | |
return s.Id | |
} | |
func (s *SleepingSubscriber) Consume(msg Message) error { | |
fmt.Println("Consuming Msg: %v", msg) | |
time.Sleep(s.SleepDuration) | |
fmt.Println("Consumed Msg: %v", msg) | |
return nil | |
} | |
--- | |
File: /faltu/subscriber.go | |
--- | |
package main | |
type Subscriber interface { | |
ID() string | |
Consume(msg Message) error | |
} | |
--- | |
File: /faltu/topic_impl.go | |
--- | |
package main | |
import ( | |
"sync" | |
"github.com/google/uuid" | |
) | |
type TopicImpl struct { | |
ID uuid.UUID | |
Name string | |
m sync.RWMutex | |
MsgStore MessageStore | |
} | |
func NewTopic(name string) Topic { | |
return &TopicImpl{ | |
ID: uuid.New(), | |
Name: name, | |
MsgStore: newInMemMsgStore(), | |
} | |
} | |
func (t *TopicImpl) Publish(msg Message) { | |
t.MsgStore.Append(msg) | |
} | |
--- | |
File: /faltu/topic_manager_impl.go | |
--- | |
package main | |
import "sync" | |
type TopicsManangerImpl struct { | |
m sync.RWMutex | |
topics map[string]Topic | |
} | |
func NewTopicsManagerImpl() TopicsManager { | |
return &TopicsManangerImpl{ | |
m: sync.RWMutex{}, | |
topics: make(map[string]Topic), | |
} | |
} | |
func (t *TopicsManangerImpl) CreateTopic(name string) Topic { | |
t.m.Lock() | |
defer t.m.Unlock() | |
newTopic := NewTopic(name) | |
t.topics[name] = newTopic | |
return newTopic | |
} | |
func (t *TopicsManangerImpl) DeleteTopic(name string) { | |
t.m.Lock() | |
defer t.m.Unlock() | |
delete(t.topics, name) | |
} | |
func (t *TopicsManangerImpl) Publish(topicName string, msg Message) { | |
t.m.Lock() | |
defer t.m.Unlock() | |
topic, found := t.topics[topicName] | |
if found { | |
topic.Publish(msg) | |
} | |
} | |
--- | |
File: /faltu/topic_manager.go | |
--- | |
package main | |
type TopicsManager interface { | |
CreateTopic(name string) Topic | |
DeleteTopic(name string) | |
Publish(topicName string, msg Message) | |
} | |
--- | |
File: /faltu/topic_single_partition.go | |
--- | |
package main | |
import ( | |
"fmt" | |
"github.com/google/uuid" | |
) | |
type SinglePartitionTopic struct { | |
ID uuid.UUID | |
Name string | |
MsgStore MessageStore | |
} | |
func NewSinglePartitionTopic(name string, msgStoreType MsgStoreType) (Topic, error) { | |
msgStore, err := NewMsgStore(msgStoreType) | |
if err != nil { | |
return nil, fmt.Errorf("failed to create msg store, err: %w", err) | |
} | |
return &SinglePartitionTopic{ | |
ID: uuid.New(), | |
Name: name, | |
MsgStore: msgStore, | |
}, nil | |
} | |
func (t *SinglePartitionTopic) Publish(msg Message) { | |
t.MsgStore.Append(msg) | |
} | |
func (t *SinglePartitionTopic) Offset() int { | |
return t.MsgStore.Offset() | |
} | |
--- | |
File: /faltu/topic.go | |
--- | |
package main | |
type Topic interface { | |
Offset() int | |
Publish(msg Message) | |
} | |
--- | |
File: /faltu/worker_pool_impl.go | |
--- | |
package main | |
import "sync" | |
type WorkerPoolImpl struct { | |
numWorkers int | |
workQueue chan func() | |
wg *sync.WaitGroup | |
once sync.Once | |
mu sync.RWMutex | |
stopped bool | |
} | |
func newWorkerPoolImpl(numWorkers, bufferSize int) WorkerPool { | |
return &WorkerPoolImpl{ | |
numWorkers: numWorkers, | |
workQueue: make(chan func(), bufferSize), | |
wg: &sync.WaitGroup{}, | |
mu: sync.RWMutex{}, | |
once: sync.Once{}, | |
stopped: false, | |
} | |
} | |
func (wp *WorkerPoolImpl) SubmitWork(work func()) { | |
wp.mu.RLock() | |
if wp.stopped == true { | |
wp.mu.RUnlock() | |
return | |
} | |
wp.mu.RUnlock() | |
wp.workQueue <- work | |
} | |
func (wp *WorkerPoolImpl) startWorker() { | |
defer wp.wg.Done() | |
for work := range wp.workQueue { | |
work() | |
} | |
} | |
func (wp *WorkerPoolImpl) Start() { | |
wp.once.Do(func() { | |
for _ = range wp.numWorkers { | |
wp.wg.Add(1) | |
go wp.startWorker() | |
} | |
}) | |
} | |
func (wp *WorkerPoolImpl) Stop() { | |
wp.mu.RLock() | |
if wp.stopped == true { | |
wp.mu.RUnlock() | |
return | |
} | |
wp.mu.RUnlock() | |
wp.mu.Lock() | |
defer wp.mu.Unlock() | |
// Signal workers that no more work will be given to them | |
// but let them finish the work at hand | |
close(wp.workQueue) | |
// wait for workers to finish work at hand | |
wp.wg.Wait() | |
wp.stopped = true | |
} | |
--- | |
File: /faltu/worker_pool.go | |
--- | |
package main | |
type WorkerPool interface { | |
SubmitWork(work func()) | |
Start() | |
Stop() | |
} | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment