Skip to content

Instantly share code, notes, and snippets.

@a-r-d
Created July 17, 2024 00:35
Show Gist options
  • Save a-r-d/c2b84cb407879aac72caea7a45b6aa8f to your computer and use it in GitHub Desktop.
Save a-r-d/c2b84cb407879aac72caea7a45b6aa8f to your computer and use it in GitHub Desktop.

To parallelize job processing across multiple servers while ensuring each message is processed only once, we can use Redis' List data structure as a FIFO queue, along with its atomic operations. Here's how you can modify the approach:

Use Redis List as a FIFO queue Use Redis' BRPOPLPUSH command for reliable queue processing Implement a worker model that can run on multiple servers

Key points about this implementation:

FIFO Queue: We use Redis' List data structure (job_queue) as a FIFO queue. LPUSH adds jobs to the left (front) of the list, and BRPOPLPUSH removes from the right (back), ensuring FIFO order.

Atomic Operations: BRPOPLPUSH atomically moves a job from the main queue to a processing queue. This ensures that even if a worker crashes, the job isn't lost and can be recovered.

Parallel Processing: Multiple worker processes can run this code on different servers. Redis ensures each job is given to only one worker.

Reliability: If a worker crashes while processing a job, the job remains in the processing queue. You could implement a separate process to check for and requeue stalled jobs from the processing queue.

Deduplication: The deduplication logic remains the same, preventing repeated processing of the same job ID.

To use this in a production environment:

Run the worker function on multiple servers. Each server can also run multiple worker processes. Implement proper error handling, logging, and monitoring. Consider using a Redis connection pool for better performance. Implement a mechanism to handle stalled jobs in the processing queue (jobs that were being processed when a worker crashed). Consider implementing a dead-letter queue for jobs that consistently fail processing. Use Redis persistence and replication for data durability and high availability.

This setup allows you to scale horizontally by adding more worker servers, while ensuring each job is processed only once and in the order it was received. The use of Redis as a centralized queue enables coordination between distributed workers without them needing to communicate directly with each other.

import redis
import json
import time
from datetime import timedelta
# Connect to Redis
r = redis.Redis(host='localhost', port=6379, db=0)
# Queue names
MAIN_QUEUE = 'job_queue'
PROCESSING_QUEUE = 'processing_queue'
# Webhook handler (producer)
def webhook_handler(data):
message = json.dumps(data)
r.lpush(MAIN_QUEUE, message)
print(f"Enqueued: {message}")
# Worker function (consumer)
def worker():
while True:
# Atomically move a job from the main queue to the processing queue
job = r.brpoplpush(MAIN_QUEUE, PROCESSING_QUEUE, timeout=0)
if job:
try:
data = json.loads(job)
job_id = data.get('job_id')
# Deduplication logic
if not is_duplicate(job_id):
process_job(data)
# Remove the job from the processing queue
r.lrem(PROCESSING_QUEUE, 1, job)
except Exception as e:
print(f"Error processing job: {e}")
# Optionally, move to a dead-letter queue instead of removing
r.lpush('dead_letter_queue', job)
r.lrem(PROCESSING_QUEUE, 1, job)
def is_duplicate(job_id):
key = f"processed_job:{job_id}"
is_new = r.set(key, 1, ex=timedelta(hours=1), nx=True)
return not is_new
def process_job(data):
# Your job processing logic here
print(f"Processing job: {data}")
time.sleep(1) # Simulate some work
# Example usage
if __name__ == "__main__":
import threading
# Simulate multiple workers (in practice, these would be on different servers)
num_workers = 3
workers = [threading.Thread(target=worker) for _ in range(num_workers)]
for w in workers:
w.start()
# Simulate webhook calls
for i in range(10):
webhook_data = {"job_id": f"{i}", "task": f"task_{i}"}
webhook_handler(webhook_data)
# Wait for workers to finish
for w in workers:
w.join()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment