Skip to content

Instantly share code, notes, and snippets.

@gaeldelalleau
Created February 4, 2025 16:27
Show Gist options
  • Save gaeldelalleau/22b210c6a48a5e2716fab37a50dc994d to your computer and use it in GitHub Desktop.
Save gaeldelalleau/22b210c6a48a5e2716fab37a50dc994d to your computer and use it in GitHub Desktop.
### **Background**
You are working as a **senior software engineer** at **QuantumCompute Inc.**, a research company specializing in **high-performance distributed computing**. Your team is designing a **fault-tolerant, dynamically scalable task scheduler** that optimizes computational workloads across multiple nodes in a **heterogeneous cluster environment**.
The system must support:
* **Asynchronous task execution** with priority-based scheduling.
* **Load balancing** across distributed nodes.
* **Failure recovery** with stateful checkpointing.
* **Adaptive scaling**, dynamically adding/removing compute nodes.
* **Handling both CPU-intensive and memory-intensive workloads efficiently.**
Your team lead, **Dr. Elena Vasquez**, outlined key requirements:
1. **Tasks should be executed asynchronously** and prioritized based on urgency and resource constraints.
2. The **task queue should support preemption**—low-priority tasks can be delayed if a high-priority task arrives.
3. **Load balancing logic** should be adaptive—nodes with excess capacity should take over tasks from overloaded nodes.
4. **Failure handling** should include **automatic checkpointing**—if a task fails on one node, it should restart on another.
5. **The system should operate at scale**, efficiently handling thousands of tasks across hundreds of nodes.
### **Current Implementation (Buggy Code)**
The system is implemented in **Python**, using:
* asyncio for asynchronous task execution.
* multiprocessing for parallel execution.
* redis as a task queue.
* grpc for inter-node communication.
However, a **critical bug** has been reported:
#### **Issue:**
When a node goes down unexpectedly, **some in-progress tasks are lost** instead of being reassigned.
#### **Expected Behavior:**
Tasks from failed nodes should be **checkpointed** and **rescheduled** on another available node.
Below is the current **task scheduler implementation**, which has a **bug in failure handling**:\`\`\`\`import asyncio
import redis
import multiprocessing
import time
import random
import threading
from concurrent.futures import ThreadPoolExecutor
class TaskScheduler:
def \_\_init\_\_(self, redis\_host="localhost", redis\_port=6379):
"""Initialize Redis queue, task tracking, and worker threads"""
self.queue = redis.Redis(host=redis\_host, port=redis\_port, db=0)
self.running\_tasks = {} # To track active tasks
self.completed\_tasks = set()
self.failed\_tasks = set()
self.lock = threading.Lock()
self.executor = ThreadPoolExecutor(max\_workers=5)
self.node\_id = random.randint(1000, 9999)
async def submit\_task(self, task\_id, priority, payload=None):
"""Submits a new task with a given priority."""
task\_data = {"priority": priority, "task\_id": task\_id, "payload": payload or {}}
self.queue.rpush("tasks", str(task\_data))
async def execute\_tasks(self):
"""Continuously executes tasks from the queue."""
while True:
task\_data = self.queue.lpop("tasks")
if task\_data:
task\_data = eval(task\_data.decode('utf-8'))
priority, task\_id = task\_data\["priority"\], task\_data\["task\_id"\]
with self.lock:
self.running\_tasks\[task\_id\] = time.time()
print(f"\[Node {self.node\_id}\] Executing task {task\_id} with priority {priority}")
await asyncio.sleep(random.randint(1, 5)) # Simulate task execution
with self.lock:
del self.running\_tasks\[task\_id\]
self.completed\_tasks.add(task\_id)
def resubmit\_failed\_tasks(self):
"""Resubmit tasks that failed during execution"""
for task\_id in list(self.failed\_tasks):
print(f"Resubmitting failed task {task\_id}")
asyncio.run(self.submit\_task(task\_id, priority=1))
self.failed\_tasks.remove(task\_id)
def get\_active\_tasks(self):
"""Returns a list of all active tasks"""
with self.lock:
return list(self.running\_tasks.keys())
def get\_completed\_tasks(self):
"""Returns a list of all completed tasks"""
return list(self.completed\_tasks)
def get\_failed\_tasks(self):
"""Returns a list of all failed tasks"""
return list(self.failed\_tasks)
def node\_health\_check(self):
"""Simulates a node health check by detecting failures"""
while True:
print(f"\[Node {self.node\_id}\] Health Check: Running tasks {len(self.running\_tasks)}")
if random.random() < 0.1: # Simulate node failure 10% of the time
print(f"\[Node {self.node\_id}\] CRITICAL FAILURE DETECTED!")
self.failed\_tasks.update(self.running\_tasks.keys())
self.running\_tasks.clear()
time.sleep(5)
def distribute\_tasks(self):
"""Distributes tasks among available worker threads"""
while True:
active\_workers = threading.active\_count()
if active\_workers < 5:
print(f"\[Node {self.node\_id}\] Distributing tasks... Active workers: {active\_workers}")
threading.Thread(target=self.run\_task, daemon=True).start()
time.sleep(3)
def run\_task(self):
"""Executes a single task asynchronously"""
asyncio.run(self.execute\_tasks())
def cleanup\_completed\_tasks(self):
"""Periodically removes old completed tasks from memory"""
while True:
with self.lock:
self.completed\_tasks = set(list(self.completed\_tasks)\[-100:\]) # Keep only last 100 completed
time.sleep(10)
def task\_preemption(self, new\_task\_id, new\_priority):
"""Preempts a lower-priority task if a higher-priority task arrives"""
with self.lock:
if self.running\_tasks:
lowest\_priority\_task = min(self.running\_tasks.items(), key=lambda x: x\[1\])
if new\_priority > lowest\_priority\_task\[1\]:
print(f"Preempting task {lowest\_priority\_task\[0\]} for new task {new\_task\_id}")
self.failed\_tasks.add(lowest\_priority\_task\[0\])
self.running\_tasks.pop(lowest\_priority\_task\[0\])
asyncio.run(self.submit\_task(new\_task\_id, new\_priority))
def load\_balancing(self):
"""Simulates dynamic load balancing between nodes"""
while True:
node\_load = len(self.running\_tasks)
print(f"\[Node {self.node\_id}\] Current Load: {node\_load} tasks")
if node\_load > 10:
print(f"\[Node {self.node\_id}\] Overloaded! Offloading tasks...")
for \_ in range(3):
task\_id = random.choice(list(self.running\_tasks.keys()))
self.running\_tasks.pop(task\_id)
asyncio.run(self.submit\_task(task\_id, priority=1))
time.sleep(5)
def monitor\_queue(self):
"""Monitors the queue size and adjusts resource allocation"""
while True:
queue\_size = self.queue.llen("tasks")
print(f"\[Node {self.node\_id}\] Queue Size: {queue\_size}")
if queue\_size > 20:
print(f"\[Node {self.node\_id}\] Queue overload detected! Spawning extra workers...")
self.executor.submit(self.run\_task)
time.sleep(5)
def check\_task\_staleness(self):
"""Detects stale tasks that are taking too long"""
while True:
current\_time = time.time()
with self.lock:
for task\_id, start\_time in list(self.running\_tasks.items()):
if current\_time - start\_time > 10: # Assume 10s is too long
print(f"\[Node {self.node\_id}\] Task {task\_id} is stale, reassigning...")
self.failed\_tasks.add(task\_id)
self.running\_tasks.pop(task\_id)
asyncio.run(self.submit\_task(task\_id, priority=1))
time.sleep(3)
def start\_scheduler(self):
"""Starts the scheduler with multiple background tasks"""
print(f"\[Node {self.node\_id}\] Starting scheduler...")
threading.Thread(target=self.node\_health\_check, daemon=True).start()
threading.Thread(target=self.distribute\_tasks, daemon=True).start()
threading.Thread(target=self.cleanup\_completed\_tasks, daemon=True).start()
threading.Thread(target=self.load\_balancing, daemon=True).start()
threading.Thread(target=self.monitor\_queue, daemon=True).start()
threading.Thread(target=self.check\_task\_staleness, daemon=True).start()
self.run\_task()
scheduler = TaskScheduler()
scheduler.start\_scheduler()
\`\`\`
### **Identified Problems**
1. **No Fault Tolerance**: If a task is in progress and the node crashes, it is lost because no checkpointing mechanism exists.
2. **No Task Reassignment**: When a node goes down, pending tasks remain unprocessed.
3. **Inefficient Load Balancing**: The task queue does not dynamically redistribute tasks across nodes.
4. **Synchronous Execution in a Distributed Setting**: Tasks are handled sequentially by a single node, making the system inefficient at scale.
**Your Task**
-------------
1. **Identify the problem** in the existing code.
2. **Propose a robust solution** to ensure **fault-tolerant task reassignment**.
3. **Modify the existing code** to fix the bug.
4. **Ensure the new implementation supports dynamic task recovery.**
5. **Use Redis to store checkpoints so that tasks from failed nodes can be resumed by other nodes.**
6. **Implement a distributed health-check mechanism to detect node failures.**
7. **Ensure high performance by using asynchronous task management and efficient inter-node communication.**
**Additional Requirements**
---------------------------
* The solution should be **scalable** and **efficient**.
* It must use **asynchronous programming** effectively.
* It should handle **node failures gracefully**.
* The task queue should support **preemption**, meaning high-priority tasks can displace lower-priority ones.
**Evaluation Criteria**
-----------------------
1. **Correctness** – Does the model correctly identify the issue and propose a valid fix?
2. **Code Quality** – Is the solution well-structured and idiomatic?
3. **Scalability** – Does the fix account for a distributed system with multiple nodes?
4. **Long-Context Handling** – Does the model utilize all relevant details from the 2048-token prompt?
5. **Efficiency** – Is the revised system optimized for high-throughput execution?
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment