Skip to content

Instantly share code, notes, and snippets.

@Vaansh
Created April 5, 2023 17:28
Show Gist options
  • Save Vaansh/f996220be3ee83fd29857fd50311e7bf to your computer and use it in GitHub Desktop.
Save Vaansh/f996220be3ee83fd29857fd50311e7bf to your computer and use it in GitHub Desktop.
Many to one example
#!/usr/bin/env python
from __future__ import annotations
import logging
import time
from abc import ABC, abstractmethod
from enum import Enum
from multiprocessing import Process
import pika
logging.basicConfig(level=logging.INFO)
class Platform(Enum):
SUB = 1
PUBA = 2
PUBB = 3
class Publisher(ABC):
@abstractmethod
def publish(self):
pass
class Subscriber(ABC):
@abstractmethod
def subscribe(self, msg):
pass
class ConcretePublisherA(Publisher):
def __init__(self, identifier, mq):
self.identifier = identifier
self.mq = mq
def publish(self):
connection = pika.BlockingConnection(
pika.ConnectionParameters(host="localhost")
)
channel = connection.channel()
body = "A {} Published".format(self.identifier)
for _ in range(5):
time.sleep(2)
channel.basic_publish(exchange="", routing_key=self.mq, body=body)
logging.info("{} ->".format(body))
logging.info("A Disconnected.")
connection.close()
class ConcretePublisherB(Publisher):
def __init__(self, identifier, mq):
self.identifier = identifier
self.mq = mq
def publish(self):
connection = pika.BlockingConnection(
pika.ConnectionParameters(host="localhost")
)
channel = connection.channel()
body = "B {} Published".format(self.identifier)
for _ in range(3):
time.sleep(3)
channel.basic_publish(exchange="", routing_key=self.mq, body=body)
logging.info("{} ->".format(body))
logging.info("B Disconnected.")
connection.close()
class ConcreteSubscriber(Subscriber):
def __init__(self, identifier):
self.id = identifier
def subscribe(self, mq):
connection = pika.BlockingConnection(
pika.ConnectionParameters(host="localhost")
)
channel = connection.channel()
channel.queue_declare(queue=mq)
def process(ch, method, properties, body):
logging.info("Subscriber notified <- " % body)
channel.basic_consume(queue=mq, on_message_callback=process, auto_ack=True)
logging.info("Subscriber Added. Listening on MQ ({}).".format(mq))
channel.start_consuming()
class Config:
def __init__(self):
self.mq = None
self.sub = None
self.pubs = []
self.sub_platform = None
def run(self):
if self.sub == None or len(self.pubs) == 0:
logging.error("Sub or Pub(s) not registered or configured incorrectly")
return
for pub in self.pubs:
process = Process(target=pub.publish)
process.start()
def register_subscriber(self, platform, identifier):
self.mq = identifier + platform.name
self.sub_platform = platform
match platform:
case Platform.SUB:
sub = ConcreteSubscriber(identifier=identifier)
case _:
sub = None
self.sub = sub
process = Process(target=self.sub.subscribe, args=(self.mq,))
process.start()
def register_publisher(self, platform, identifier):
if not (self.sub and self.sub_platform and self.mq):
logging.error("Sub not registered or configured incorrectly")
return
match platform:
case Platform.PUBA:
pub = ConcretePublisherA(identifier=identifier, mq=self.mq)
case Platform.PUBB:
pub = ConcretePublisherB(identifier=identifier, mq=self.mq)
case _:
pub = None
self.pubs.append(pub)
if __name__ == "__main__":
cfg = Config()
cfg.register_subscriber(Platform.SUB, "abc")
cfg.register_publisher(Platform.PUBA, "123")
cfg.register_publisher(Platform.PUBB, "456")
cfg.run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment