Skip to content

Instantly share code, notes, and snippets.

@mikepii
Created July 2, 2015 21:44
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mikepii/b530736a83b8d6de420e to your computer and use it in GitHub Desktop.
Save mikepii/b530736a83b8d6de420e to your computer and use it in GitHub Desktop.
luigi-swf WaitForExternalTasks base class
class WaitForExternalTasks(luigi.Task, SwfHeartbeatCancel):
"""Waits for a task in another workflow to complete.
Polls for the `complete` method on :meth:`wait_for`'s result
to return `True`.
Example of common parameters to override::
timeout = 10 * hours
check_interval = 1 * minutes
Parameters to override:
* `timeout`: After how long to raise an exception in seconds.
Note that the total time this might wait is actually
(`swf_retries` + 1) * `timeout`. (default 21 min)
* `check_interval`: Interval between checks in seconds. (default 20s)
* `swf_heartbeat_check_factor`: Used to calculate the SWF heartbeat
timeout in seconds = `check_interval` * `swf_heartbeat_check_factor` + 60
* `swf_start_to_close_timeout_buffer`: Used to calculate
`swf_start_to_close_timeout` = `timeout` +
`swf_start_to_close_timeout_buffer`
* `swf_retries`: Works as is usual in `luigi-swf`. (default 2)
"""
timeout = 21 * minutes
check_interval = 20 * seconds
swf_heartbeat_check_factor = 3
swf_start_to_close_timeout_buffer = 30 * seconds
swf_retries = 2
@property
def swf_start_to_close_timeout(self):
return self.timeout + self.swf_start_to_close_timeout_buffer
@property
def swf_task_list(self):
for task in self._wait_for_flat():
if hasattr(task, 'swf_task_list'):
return task.swf_task_list
return None
@property
def swf_heartbeat_timeout(self):
return (self.check_interval * self.swf_heartbeat_check_factor +
2 * minutes)
def complete(self):
for task in self._wait_for_flat():
if not task.complete():
return False
return True
def wait_for(self):
"""Please override this and return a task or list of tasks."""
raise NotImplementedError
def _wait_for_flat(self):
wait_for = self.wait_for()
assert not isinstance(wait_for, string_types)
if isinstance(wait_for, collections.Iterable):
return wait_for
else:
return [wait_for]
def run(self):
start_t = datetime.datetime.utcnow()
elapsed_t = 0 * seconds
hb_i = 0
sleep(random.uniform(0, 2))
while elapsed_t <= self.timeout:
if hb_i == 0:
self.heartbeat()
hb_i = (hb_i + 1) % self.swf_heartbeat_check_factor
if self.cancel_requested:
self.ack_cancel()
return
if self.complete():
return
sleep(self.check_interval + random.uniform(-3, 3))
elapsed_t = (datetime.datetime.utcnow() - start_t).total_seconds()
raise RuntimeError("Timeout exceeded")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment