Created
January 18, 2023 23:19
-
-
Save puuble/a04402c3ba5488422dfa1a3cf57463a2 to your computer and use it in GitHub Desktop.
python event emitter with SQS
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import threading | |
from queue import Queue | |
import boto3 | |
from pymongo import MongoClient | |
class LogService: | |
def __init__(self): | |
self.client = MongoClient('mongodb://localhost:27017/') | |
self.db = self.client['logs'] | |
def save_log(self, data): | |
try: | |
self.db['logs'].insert_one(data) | |
print(f"Saved log: {data}") | |
except Exception as e: | |
print(f"Error saving log: {e}") | |
def save_fail(self, data): | |
try: | |
self.db['fails'].insert_one(data) | |
print(f"Saved fail: {data}") | |
except Exception as e: | |
print(f"Error saving fail: {e}") | |
class EventEmitter: | |
def __init__(self): | |
self.listeners = {} | |
def on(self, event, listener): | |
if event not in self.listeners: | |
self.listeners[event] = [] | |
self.listeners[event].append(listener) | |
def emit(self, event, *args, **kwargs): | |
for listener in self.listeners.get(event, []): | |
listener(*args, **kwargs) | |
def worker(event_emitter, queue): | |
sqs = boto3.client('sqs', region_name='us-west-2', aws_access_key_id='ACCESS_KEY', | |
aws_secret_access_key='SECRET_KEY') | |
while True: | |
try: | |
result = sqs.receive_message(QueueUrl='https://sqs.us-west-2.amazonaws.com/1234567890/my-queue', | |
WaitTimeSeconds=20) | |
if 'Messages' in result: | |
delete_entries = [] | |
for message in result['Messages']: | |
body = message['Body'] | |
event_emitter.emit('message', body) | |
delete_entries.append({'Id': message['MessageId'], 'ReceiptHandle': message['ReceiptHandle']}) | |
sqs.delete_message_batch(QueueUrl='https://sqs.us-west-2.amazonaws.com/1234567890/my-queue', | |
Entries=delete_entries) | |
except Exception as e: | |
print(f"Error processing SQS messages: {e}") | |
if __name__ == "__main__": | |
event_emitter = EventEmitter() | |
log_service = LogService() | |
event_emitter.on("message", lambda message: print(f"Received message from worker: {message}")) | |
event_emitter.on("message", lambda message: log_service.save_log({"message": f"Received message from worker: {message}"})) | |
num_of_workers = 4 | |
for _ in range(num_of_workers): | |
thread = threading.Thread(target=worker, args=(event_emitter, Queue())) | |
thread.start() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment