Skip to content

Instantly share code, notes, and snippets.

@shivendrasoni
Last active May 20, 2023 10:55
Show Gist options
  • Save shivendrasoni/298da557b471f5a851843f118dee76ce to your computer and use it in GitHub Desktop.
Save shivendrasoni/298da557b471f5a851843f118dee76ce to your computer and use it in GitHub Desktop.
class Message:
def __init__(self, content):
self.content = content
self.read_by = set()
class Topic:
def __init__(self, name):
self.name = name
self.messages = []
self.consumer_groups = {}
def add_message(self, message):
self.messages.append(Message(message))
def add_consumer_group(self, consumer_group):
self.consumer_groups[consumer_group.name] = consumer_group
def read_message(self, consumer_group_name):
consumer_group = self.consumer_groups.get(consumer_group_name)
if not consumer_group:
raise Exception(f"Consumer group {consumer_group_name} does not exist")
for message in self.messages:
if consumer_group not in message.read_by:
message.read_by.add(consumer_group)
if len(message.read_by) == len(self.consumer_groups):
self.messages.remove(message)
return message.content
return None
class ConsumerGroup:
def __init__(self, name):
self.name = name
class MessageQueue:
def __init__(self):
self.topics = {}
def create_topic(self, topic_name, consumer_group_names):
if topic_name not in self.topics:
self.topics[topic_name] = Topic(topic_name)
for consumer_group_name in consumer_group_names:
self.topics[topic_name].add_consumer_group(ConsumerGroup(consumer_group_name))
def push(self, topic_name, message):
if topic_name in self.topics:
self.topics[topic_name].add_message(message)
else:
raise Exception(f"Topic {topic_name} does not exist")
def poll(self, topic_name, consumer_group_name):
if topic_name in self.topics:
return self.topics[topic_name].read_message(consumer_group_name)
else:
raise Exception(f"Topic {topic_name} does not exist")
mq = MessageQueue()
mq.create_topic("logs", ["analytics", "dev"])
mq.push("logs", "Message 1")
mq.push("logs", "Message 2")
mq.push("logs", "Message 3")
print(mq.poll("logs", "analytics")) # prints "Message 1"
print(mq.poll("logs", "dev")) # prints "Message 1"
print(mq.poll("logs", "analytics")) # prints "Message 2"
print(mq.poll("logs", "dev")) # prints "Message 2"
print(mq.poll("logs", "analytics")) # prints "Message 3"
print(mq.poll("logs", "dev")) # prints "Message 3"
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment