Created
February 4, 2025 16:27
-
-
Save gaeldelalleau/22b210c6a48a5e2716fab37a50dc994d to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
### **Background** | |
You are working as a **senior software engineer** at **QuantumCompute Inc.**, a research company specializing in **high-performance distributed computing**. Your team is designing a **fault-tolerant, dynamically scalable task scheduler** that optimizes computational workloads across multiple nodes in a **heterogeneous cluster environment**. | |
The system must support: | |
* **Asynchronous task execution** with priority-based scheduling. | |
* **Load balancing** across distributed nodes. | |
* **Failure recovery** with stateful checkpointing. | |
* **Adaptive scaling**, dynamically adding/removing compute nodes. | |
* **Handling both CPU-intensive and memory-intensive workloads efficiently.** | |
Your team lead, **Dr. Elena Vasquez**, outlined key requirements: | |
1. **Tasks should be executed asynchronously** and prioritized based on urgency and resource constraints. | |
2. The **task queue should support preemption**—low-priority tasks can be delayed if a high-priority task arrives. | |
3. **Load balancing logic** should be adaptive—nodes with excess capacity should take over tasks from overloaded nodes. | |
4. **Failure handling** should include **automatic checkpointing**—if a task fails on one node, it should restart on another. | |
5. **The system should operate at scale**, efficiently handling thousands of tasks across hundreds of nodes. | |
### **Current Implementation (Buggy Code)** | |
The system is implemented in **Python**, using: | |
* asyncio for asynchronous task execution. | |
* multiprocessing for parallel execution. | |
* redis as a task queue. | |
* grpc for inter-node communication. | |
However, a **critical bug** has been reported: | |
#### **Issue:** | |
When a node goes down unexpectedly, **some in-progress tasks are lost** instead of being reassigned. | |
#### **Expected Behavior:** | |
Tasks from failed nodes should be **checkpointed** and **rescheduled** on another available node. | |
Below is the current **task scheduler implementation**, which has a **bug in failure handling**:\`\`\`\`import asyncio | |
import redis | |
import multiprocessing | |
import time | |
import random | |
import threading | |
from concurrent.futures import ThreadPoolExecutor | |
class TaskScheduler: | |
def \_\_init\_\_(self, redis\_host="localhost", redis\_port=6379): | |
"""Initialize Redis queue, task tracking, and worker threads""" | |
self.queue = redis.Redis(host=redis\_host, port=redis\_port, db=0) | |
self.running\_tasks = {} # To track active tasks | |
self.completed\_tasks = set() | |
self.failed\_tasks = set() | |
self.lock = threading.Lock() | |
self.executor = ThreadPoolExecutor(max\_workers=5) | |
self.node\_id = random.randint(1000, 9999) | |
async def submit\_task(self, task\_id, priority, payload=None): | |
"""Submits a new task with a given priority.""" | |
task\_data = {"priority": priority, "task\_id": task\_id, "payload": payload or {}} | |
self.queue.rpush("tasks", str(task\_data)) | |
async def execute\_tasks(self): | |
"""Continuously executes tasks from the queue.""" | |
while True: | |
task\_data = self.queue.lpop("tasks") | |
if task\_data: | |
task\_data = eval(task\_data.decode('utf-8')) | |
priority, task\_id = task\_data\["priority"\], task\_data\["task\_id"\] | |
with self.lock: | |
self.running\_tasks\[task\_id\] = time.time() | |
print(f"\[Node {self.node\_id}\] Executing task {task\_id} with priority {priority}") | |
await asyncio.sleep(random.randint(1, 5)) # Simulate task execution | |
with self.lock: | |
del self.running\_tasks\[task\_id\] | |
self.completed\_tasks.add(task\_id) | |
def resubmit\_failed\_tasks(self): | |
"""Resubmit tasks that failed during execution""" | |
for task\_id in list(self.failed\_tasks): | |
print(f"Resubmitting failed task {task\_id}") | |
asyncio.run(self.submit\_task(task\_id, priority=1)) | |
self.failed\_tasks.remove(task\_id) | |
def get\_active\_tasks(self): | |
"""Returns a list of all active tasks""" | |
with self.lock: | |
return list(self.running\_tasks.keys()) | |
def get\_completed\_tasks(self): | |
"""Returns a list of all completed tasks""" | |
return list(self.completed\_tasks) | |
def get\_failed\_tasks(self): | |
"""Returns a list of all failed tasks""" | |
return list(self.failed\_tasks) | |
def node\_health\_check(self): | |
"""Simulates a node health check by detecting failures""" | |
while True: | |
print(f"\[Node {self.node\_id}\] Health Check: Running tasks {len(self.running\_tasks)}") | |
if random.random() < 0.1: # Simulate node failure 10% of the time | |
print(f"\[Node {self.node\_id}\] CRITICAL FAILURE DETECTED!") | |
self.failed\_tasks.update(self.running\_tasks.keys()) | |
self.running\_tasks.clear() | |
time.sleep(5) | |
def distribute\_tasks(self): | |
"""Distributes tasks among available worker threads""" | |
while True: | |
active\_workers = threading.active\_count() | |
if active\_workers < 5: | |
print(f"\[Node {self.node\_id}\] Distributing tasks... Active workers: {active\_workers}") | |
threading.Thread(target=self.run\_task, daemon=True).start() | |
time.sleep(3) | |
def run\_task(self): | |
"""Executes a single task asynchronously""" | |
asyncio.run(self.execute\_tasks()) | |
def cleanup\_completed\_tasks(self): | |
"""Periodically removes old completed tasks from memory""" | |
while True: | |
with self.lock: | |
self.completed\_tasks = set(list(self.completed\_tasks)\[-100:\]) # Keep only last 100 completed | |
time.sleep(10) | |
def task\_preemption(self, new\_task\_id, new\_priority): | |
"""Preempts a lower-priority task if a higher-priority task arrives""" | |
with self.lock: | |
if self.running\_tasks: | |
lowest\_priority\_task = min(self.running\_tasks.items(), key=lambda x: x\[1\]) | |
if new\_priority > lowest\_priority\_task\[1\]: | |
print(f"Preempting task {lowest\_priority\_task\[0\]} for new task {new\_task\_id}") | |
self.failed\_tasks.add(lowest\_priority\_task\[0\]) | |
self.running\_tasks.pop(lowest\_priority\_task\[0\]) | |
asyncio.run(self.submit\_task(new\_task\_id, new\_priority)) | |
def load\_balancing(self): | |
"""Simulates dynamic load balancing between nodes""" | |
while True: | |
node\_load = len(self.running\_tasks) | |
print(f"\[Node {self.node\_id}\] Current Load: {node\_load} tasks") | |
if node\_load > 10: | |
print(f"\[Node {self.node\_id}\] Overloaded! Offloading tasks...") | |
for \_ in range(3): | |
task\_id = random.choice(list(self.running\_tasks.keys())) | |
self.running\_tasks.pop(task\_id) | |
asyncio.run(self.submit\_task(task\_id, priority=1)) | |
time.sleep(5) | |
def monitor\_queue(self): | |
"""Monitors the queue size and adjusts resource allocation""" | |
while True: | |
queue\_size = self.queue.llen("tasks") | |
print(f"\[Node {self.node\_id}\] Queue Size: {queue\_size}") | |
if queue\_size > 20: | |
print(f"\[Node {self.node\_id}\] Queue overload detected! Spawning extra workers...") | |
self.executor.submit(self.run\_task) | |
time.sleep(5) | |
def check\_task\_staleness(self): | |
"""Detects stale tasks that are taking too long""" | |
while True: | |
current\_time = time.time() | |
with self.lock: | |
for task\_id, start\_time in list(self.running\_tasks.items()): | |
if current\_time - start\_time > 10: # Assume 10s is too long | |
print(f"\[Node {self.node\_id}\] Task {task\_id} is stale, reassigning...") | |
self.failed\_tasks.add(task\_id) | |
self.running\_tasks.pop(task\_id) | |
asyncio.run(self.submit\_task(task\_id, priority=1)) | |
time.sleep(3) | |
def start\_scheduler(self): | |
"""Starts the scheduler with multiple background tasks""" | |
print(f"\[Node {self.node\_id}\] Starting scheduler...") | |
threading.Thread(target=self.node\_health\_check, daemon=True).start() | |
threading.Thread(target=self.distribute\_tasks, daemon=True).start() | |
threading.Thread(target=self.cleanup\_completed\_tasks, daemon=True).start() | |
threading.Thread(target=self.load\_balancing, daemon=True).start() | |
threading.Thread(target=self.monitor\_queue, daemon=True).start() | |
threading.Thread(target=self.check\_task\_staleness, daemon=True).start() | |
self.run\_task() | |
scheduler = TaskScheduler() | |
scheduler.start\_scheduler() | |
\`\`\` | |
### **Identified Problems** | |
1. **No Fault Tolerance**: If a task is in progress and the node crashes, it is lost because no checkpointing mechanism exists. | |
2. **No Task Reassignment**: When a node goes down, pending tasks remain unprocessed. | |
3. **Inefficient Load Balancing**: The task queue does not dynamically redistribute tasks across nodes. | |
4. **Synchronous Execution in a Distributed Setting**: Tasks are handled sequentially by a single node, making the system inefficient at scale. | |
**Your Task** | |
------------- | |
1. **Identify the problem** in the existing code. | |
2. **Propose a robust solution** to ensure **fault-tolerant task reassignment**. | |
3. **Modify the existing code** to fix the bug. | |
4. **Ensure the new implementation supports dynamic task recovery.** | |
5. **Use Redis to store checkpoints so that tasks from failed nodes can be resumed by other nodes.** | |
6. **Implement a distributed health-check mechanism to detect node failures.** | |
7. **Ensure high performance by using asynchronous task management and efficient inter-node communication.** | |
**Additional Requirements** | |
--------------------------- | |
* The solution should be **scalable** and **efficient**. | |
* It must use **asynchronous programming** effectively. | |
* It should handle **node failures gracefully**. | |
* The task queue should support **preemption**, meaning high-priority tasks can displace lower-priority ones. | |
**Evaluation Criteria** | |
----------------------- | |
1. **Correctness** – Does the model correctly identify the issue and propose a valid fix? | |
2. **Code Quality** – Is the solution well-structured and idiomatic? | |
3. **Scalability** – Does the fix account for a distributed system with multiple nodes? | |
4. **Long-Context Handling** – Does the model utilize all relevant details from the 2048-token prompt? | |
5. **Efficiency** – Is the revised system optimized for high-throughput execution? |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment