Skip to content

Instantly share code, notes, and snippets.

@moshaad7
Last active February 6, 2025 22:57
Show Gist options
  • Save moshaad7/176bc7f0dea21585651a1f83b44e203a to your computer and use it in GitHub Desktop.
Save moshaad7/176bc7f0dea21585651a1f83b44e203a to your computer and use it in GitHub Desktop.
LLD_PUB_SUB
Directory Structure:
└── ./
└── faltu
├── main.go
├── msg_impl.go
├── msg_store_in_mem.go
├── msg_store.go
├── msg.go
├── pubsub_impl.go
├── pubsub.go
├── subscriber_manager_impl.go
├── subscriber_manager.go
├── subscriber_sleeping.go
├── subscriber.go
├── topic_impl.go
├── topic_manager_impl.go
├── topic_manager.go
├── topic_single_partition.go
├── topic.go
├── worker_pool_impl.go
└── worker_pool.go
---
File: /faltu/main.go
---
package main
import (
"fmt"
)
func main() {
fmt.Println("Hello World!")
}
---
File: /faltu/msg_impl.go
---
package main
import "fmt"
type MessageImpl struct {
Key string
Value string
}
func NewMessageImpl(key string, value string) Message {
return &MessageImpl{
Key: key,
Value: value,
}
}
func (m *MessageImpl) String() string {
return fmt.Sprintf("{Key: %v, Value: %v}", m.Key, m.Value)
}
func (m *MessageImpl) DeepCopy() Message {
return &MessageImpl{
Key: m.Key,
Value: m.Value,
}
}
---
File: /faltu/msg_store_in_mem.go
---
package main
import "sync"
type InMemMsgStore struct {
m sync.RWMutex
messages []Message
}
func newInMemMsgStore() MessageStore {
return &InMemMsgStore{
m: sync.RWMutex{},
messages: make([]Message, 0),
}
}
func (s *InMemMsgStore) Append(msg Message) {
s.m.Lock()
defer s.m.Unlock()
s.messages = append(s.messages, msg)
}
func (s *InMemMsgStore) Offset() int {
s.m.Lock()
defer s.m.Unlock()
return len(s.messages) - 1
}
---
File: /faltu/msg_store.go
---
package main
import "fmt"
type MessageStore interface {
Offset() int
Append(msg Message)
}
type MsgStoreType string
var (
MsgStoreTypeInMem MsgStoreType = "IN_MEM"
)
func NewMsgStore(msgStoreType MsgStoreType) (MessageStore, error) {
switch msgStoreType {
case MsgStoreTypeInMem:
return newInMemMsgStore(), nil
default:
return nil, fmt.Errorf("invalid message store type: %v", string(msgStoreType))
}
}
---
File: /faltu/msg.go
---
package main
type Message interface {
String() string
DeepCopy() Message
}
---
File: /faltu/pubsub_impl.go
---
package main
import "sync"
type PubSubServiceImpl struct {
topicsManger TopicsManager
subsManager SubscribersManager
workerPool WorkerPool
}
var (
pb PubSubService
pbOnce sync.Once
)
func newPubSubServiceImpl() PubSubService {
return &PubSubServiceImpl{
topicsManger: NewTopicsManagerImpl(),
subsManager: newSubscribersManagerImpl(),
workerPool: newWorkerPoolImpl(10, 100),
}
}
func GetPubSubServiceInstance() PubSubService {
pbOnce.Do(func() {
pb = newPubSubServiceImpl()
})
return pb
}
func (pb *PubSubServiceImpl) CreateTopic(name string) Topic {
return pb.topicsManger.CreateTopic(name)
}
func (pb *PubSubServiceImpl) notifySubscribers(topicName string, msg Message) {
subscribers := pb.subsManager.GetSubs(topicName)
for _, subscriber := range subscribers {
// Capture subscriber and message for this iteration
sub := subscriber
m := msg.DeepCopy() // if necessary, so each subscriber gets its own copy
pb.workerPool.SubmitWork(func() {
sub.Consume(m)
})
}
}
func (pb *PubSubServiceImpl) Publish(topicName string, msg Message) {
pb.topicsManger.Publish(topicName, msg)
}
func (pb *PubSubServiceImpl) Subscribe(subscriber Subscriber, topicName string) {
pb.subsManager.Subscribe(topicName, subscriber)
}
func (pb *PubSubServiceImpl) Unsubscribe(topicName string, subscriber Subscriber) {
pb.subsManager.Unsubscribe(topicName, subscriber)
}
---
File: /faltu/pubsub.go
---
package main
type PubSubService interface {
CreateTopic(name string) Topic
// Publish msg to a kafka topic
Publish(topicName string, msg Message)
// Register Subscriber
Subscribe(subscriber Subscriber, topicName string)
Unsubscribe(topicName string, subscriber Subscriber)
}
---
File: /faltu/subscriber_manager_impl.go
---
package main
import "sync"
type SubscribersManagerImpl struct {
m sync.RWMutex
subscribers map[string][]Subscriber
inverseIndex map[string]map[string]int // topicName => subscriberID => Index in subscribers[topicName] list
}
func newSubscribersManagerImpl() SubscribersManager {
return &SubscribersManagerImpl{
m: sync.RWMutex{},
subscribers: make(map[string][]Subscriber),
inverseIndex: make(map[string]map[string]int),
}
}
func (s *SubscribersManagerImpl) isSubscribedLOCKED(id, topicName string) bool {
if _, found := s.inverseIndex[topicName]; !found {
s.inverseIndex[topicName] = make(map[string]int)
} else {
if _, found := s.inverseIndex[topicName][id]; found {
// Already subscribed to topic
return true
}
}
return false
}
func (s *SubscribersManagerImpl) Subscribe(topicName string, subscriber Subscriber) {
s.m.Lock()
defer s.m.Unlock()
if s.isSubscribedLOCKED(subscriber.ID(), topicName) {
return
}
if _, found := s.subscribers[topicName]; !found {
s.subscribers[topicName] = make([]Subscriber, 0)
}
s.subscribers[topicName] = append(s.subscribers[topicName], subscriber)
s.inverseIndex[topicName][subscriber.ID()] = len(s.subscribers[topicName]) - 1
}
func (s *SubscribersManagerImpl) Unsubscribe(topicName string, subscriber Subscriber) {
s.m.Lock()
defer s.m.Unlock()
if !s.isSubscribedLOCKED(subscriber.ID(), topicName) {
return
}
idx := s.inverseIndex[topicName][subscriber.ID()]
s.subscribers[topicName] = append(s.subscribers[topicName][0:idx], s.subscribers[topicName][idx+1:]...)
}
func (s *SubscribersManagerImpl) GetSubs(topicName string) []Subscriber {
s.m.RLock()
defer s.m.RUnlock()
numSubs := len(s.subscribers[topicName])
rv := make([]Subscriber, 0, numSubs)
for _, sub := range s.subscribers[topicName] {
rv = append(rv, sub)
}
return rv
}
---
File: /faltu/subscriber_manager.go
---
package main
type SubscribersManager interface {
Subscribe(topicName string, subscriber Subscriber)
Unsubscribe(topicName string, subscriber Subscriber)
GetSubs(topicName string) []Subscriber
}
---
File: /faltu/subscriber_sleeping.go
---
package main
import (
"fmt"
"time"
)
type SleepingSubscriber struct {
Id string
SleepDuration time.Duration
}
func NewSleepingSubscriber(id string, duration time.Duration) *SleepingSubscriber {
return &SleepingSubscriber{
Id: id,
SleepDuration: duration,
}
}
func (s *SleepingSubscriber) ID() string {
return s.Id
}
func (s *SleepingSubscriber) Consume(msg Message) error {
fmt.Println("Consuming Msg: %v", msg)
time.Sleep(s.SleepDuration)
fmt.Println("Consumed Msg: %v", msg)
return nil
}
---
File: /faltu/subscriber.go
---
package main
type Subscriber interface {
ID() string
Consume(msg Message) error
}
---
File: /faltu/topic_impl.go
---
package main
import (
"sync"
"github.com/google/uuid"
)
type TopicImpl struct {
ID uuid.UUID
Name string
m sync.RWMutex
MsgStore MessageStore
}
func NewTopic(name string) Topic {
return &TopicImpl{
ID: uuid.New(),
Name: name,
MsgStore: newInMemMsgStore(),
}
}
func (t *TopicImpl) Publish(msg Message) {
t.MsgStore.Append(msg)
}
---
File: /faltu/topic_manager_impl.go
---
package main
import "sync"
type TopicsManangerImpl struct {
m sync.RWMutex
topics map[string]Topic
}
func NewTopicsManagerImpl() TopicsManager {
return &TopicsManangerImpl{
m: sync.RWMutex{},
topics: make(map[string]Topic),
}
}
func (t *TopicsManangerImpl) CreateTopic(name string) Topic {
t.m.Lock()
defer t.m.Unlock()
newTopic := NewTopic(name)
t.topics[name] = newTopic
return newTopic
}
func (t *TopicsManangerImpl) DeleteTopic(name string) {
t.m.Lock()
defer t.m.Unlock()
delete(t.topics, name)
}
func (t *TopicsManangerImpl) Publish(topicName string, msg Message) {
t.m.Lock()
defer t.m.Unlock()
topic, found := t.topics[topicName]
if found {
topic.Publish(msg)
}
}
---
File: /faltu/topic_manager.go
---
package main
type TopicsManager interface {
CreateTopic(name string) Topic
DeleteTopic(name string)
Publish(topicName string, msg Message)
}
---
File: /faltu/topic_single_partition.go
---
package main
import (
"fmt"
"github.com/google/uuid"
)
type SinglePartitionTopic struct {
ID uuid.UUID
Name string
MsgStore MessageStore
}
func NewSinglePartitionTopic(name string, msgStoreType MsgStoreType) (Topic, error) {
msgStore, err := NewMsgStore(msgStoreType)
if err != nil {
return nil, fmt.Errorf("failed to create msg store, err: %w", err)
}
return &SinglePartitionTopic{
ID: uuid.New(),
Name: name,
MsgStore: msgStore,
}, nil
}
func (t *SinglePartitionTopic) Publish(msg Message) {
t.MsgStore.Append(msg)
}
func (t *SinglePartitionTopic) Offset() int {
return t.MsgStore.Offset()
}
---
File: /faltu/topic.go
---
package main
type Topic interface {
Offset() int
Publish(msg Message)
}
---
File: /faltu/worker_pool_impl.go
---
package main
import "sync"
type WorkerPoolImpl struct {
numWorkers int
workQueue chan func()
wg *sync.WaitGroup
once sync.Once
mu sync.RWMutex
stopped bool
}
func newWorkerPoolImpl(numWorkers, bufferSize int) WorkerPool {
return &WorkerPoolImpl{
numWorkers: numWorkers,
workQueue: make(chan func(), bufferSize),
wg: &sync.WaitGroup{},
mu: sync.RWMutex{},
once: sync.Once{},
stopped: false,
}
}
func (wp *WorkerPoolImpl) SubmitWork(work func()) {
wp.mu.RLock()
if wp.stopped == true {
wp.mu.RUnlock()
return
}
wp.mu.RUnlock()
wp.workQueue <- work
}
func (wp *WorkerPoolImpl) startWorker() {
defer wp.wg.Done()
for work := range wp.workQueue {
work()
}
}
func (wp *WorkerPoolImpl) Start() {
wp.once.Do(func() {
for _ = range wp.numWorkers {
wp.wg.Add(1)
go wp.startWorker()
}
})
}
func (wp *WorkerPoolImpl) Stop() {
wp.mu.RLock()
if wp.stopped == true {
wp.mu.RUnlock()
return
}
wp.mu.RUnlock()
wp.mu.Lock()
defer wp.mu.Unlock()
// Signal workers that no more work will be given to them
// but let them finish the work at hand
close(wp.workQueue)
// wait for workers to finish work at hand
wp.wg.Wait()
wp.stopped = true
}
---
File: /faltu/worker_pool.go
---
package main
type WorkerPool interface {
SubmitWork(work func())
Start()
Stop()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment