Skip to content

Instantly share code, notes, and snippets.

@puuble
Created January 18, 2023 23:19
Show Gist options
  • Save puuble/a04402c3ba5488422dfa1a3cf57463a2 to your computer and use it in GitHub Desktop.
Save puuble/a04402c3ba5488422dfa1a3cf57463a2 to your computer and use it in GitHub Desktop.
python event emitter with SQS
import threading
from queue import Queue
import boto3
from pymongo import MongoClient
class LogService:
def __init__(self):
self.client = MongoClient('mongodb://localhost:27017/')
self.db = self.client['logs']
def save_log(self, data):
try:
self.db['logs'].insert_one(data)
print(f"Saved log: {data}")
except Exception as e:
print(f"Error saving log: {e}")
def save_fail(self, data):
try:
self.db['fails'].insert_one(data)
print(f"Saved fail: {data}")
except Exception as e:
print(f"Error saving fail: {e}")
class EventEmitter:
def __init__(self):
self.listeners = {}
def on(self, event, listener):
if event not in self.listeners:
self.listeners[event] = []
self.listeners[event].append(listener)
def emit(self, event, *args, **kwargs):
for listener in self.listeners.get(event, []):
listener(*args, **kwargs)
def worker(event_emitter, queue):
sqs = boto3.client('sqs', region_name='us-west-2', aws_access_key_id='ACCESS_KEY',
aws_secret_access_key='SECRET_KEY')
while True:
try:
result = sqs.receive_message(QueueUrl='https://sqs.us-west-2.amazonaws.com/1234567890/my-queue',
WaitTimeSeconds=20)
if 'Messages' in result:
delete_entries = []
for message in result['Messages']:
body = message['Body']
event_emitter.emit('message', body)
delete_entries.append({'Id': message['MessageId'], 'ReceiptHandle': message['ReceiptHandle']})
sqs.delete_message_batch(QueueUrl='https://sqs.us-west-2.amazonaws.com/1234567890/my-queue',
Entries=delete_entries)
except Exception as e:
print(f"Error processing SQS messages: {e}")
if __name__ == "__main__":
event_emitter = EventEmitter()
log_service = LogService()
event_emitter.on("message", lambda message: print(f"Received message from worker: {message}"))
event_emitter.on("message", lambda message: log_service.save_log({"message": f"Received message from worker: {message}"}))
num_of_workers = 4
for _ in range(num_of_workers):
thread = threading.Thread(target=worker, args=(event_emitter, Queue()))
thread.start()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment