Skip to content

Instantly share code, notes, and snippets.

@sagar8192
Created August 23, 2017 23:29
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save sagar8192/bb98d0b479b3cf1afcf09785529471fd to your computer and use it in GitHub Desktop.
Save sagar8192/bb98d0b479b3cf1afcf09785529471fd to your computer and use it in GitHub Desktop.
diff --git a/task_processing/plugins/mesos/retrying_executor.py b/task_processing/plugins/mesos/retrying_executor.py
index 689a47b..5fe443f 100644
--- a/task_processing/plugins/mesos/retrying_executor.py
+++ b/task_processing/plugins/mesos/retrying_executor.py
@@ -1,5 +1,6 @@
import logging
import time
+import uuid
from operator import sub
from threading import Lock
from threading import Thread
@@ -13,15 +14,19 @@ log = logging.getLogger(__name__)
class RetryingExecutor(TaskExecutor):
- def __init__(self,
- executor,
- retry_pred=lambda e: not e.success,
- retries=3):
+ def __init__(
+ self,
+ executor,
+ retry_pred=lambda e: not e.success,
+ retries=3
+ ):
self.executor = executor
self.retries = retries
self.retry_pred = retry_pred
self.task_retries = m()
+ # Key=new_task_id, Value=old_task_id
+ self.task_id_mappings = m()
self.task_retries_lock = Lock()
self.src_queue = executor.get_event_queue()
@@ -46,14 +51,20 @@ class RetryingExecutor(TaskExecutor):
if current_retries <= 0:
return False
+ current_attempt = 1 + self.retries - current_retries
+
log.info(
'Retrying task {}, {} of {}, fail event: {}'.format(
- event.task_config.name, 1 + self.retries - current_retries,
+ event.task_config.name, current_attempt,
self.retries, event.raw
)
)
- self.run(event.task_config)
+ self.run(
+ task_config=event.task_config,
+ attempt=current_attempt
+ )
+
with self.task_retries_lock:
self.task_retries = self.task_retries.update_with(
sub, {event.task_id: 1}
@@ -64,6 +75,9 @@ class RetryingExecutor(TaskExecutor):
while True:
while not self.src_queue.empty():
e = self.src_queue.get()
+ # This syntax is probably wrong.
+ # Restore the original task_id
+ e.task_id = self.task_id_mappings[e.task_id]
if e.kind != 'task':
self.dest_queue.put(e)
@@ -80,18 +94,24 @@ class RetryingExecutor(TaskExecutor):
self.task_retries = \
self.task_retries.remove(e.task_id)
- self.dest_queue.put(e)
+ # I feel that we should only propogate events if we have
+ # exhausted all the retries.
+ self.dest_queue.put(e)
if self.stopping:
return
time.sleep(1)
- def run(self, task_config):
- if task_config.task_id not in self.task_retries:
- with self.task_retries_lock:
- self.task_retries = self.task_retries.set(
- task_config.task_id, self.retries)
+ def run(self, task_config, attempt=0):
+ new_task_id = self.generate_new_task_id(task_config.task_id)
+ with self.task_retries_lock:
+ self.task_retries = self.task_retries.set(
+ task_config.task_id,
+ attempt
+ )
+ # update the task_id
+ task_config.task_id = new_task_id
self.executor.run(task_config)
def kill(self, task_id):
@@ -108,3 +128,12 @@ class RetryingExecutor(TaskExecutor):
def get_event_queue(self):
return self.dest_queue
+
+ def generate_new_task_id(self, task_id):
+ new_task_id = uuid.uuid4()
+ with self.task_retries_lock:
+ self.task_id_mappings = self.task_retries.set(
+ new_task_id,
+ task_id
+ )
+ return new_task_id
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment