Skip to content

Instantly share code, notes, and snippets.

@pavloo
Created March 8, 2018 12:53
Show Gist options
  • Save pavloo/7b2ea26bb9cb4c7c95cca24a6675f496 to your computer and use it in GitHub Desktop.
Save pavloo/7b2ea26bb9cb4c7c95cca24a6675f496 to your computer and use it in GitHub Desktop.
def _is_node_rdy(task, graph):
tasks = session.query(Task).filter(Task.id.in_(list(graph.predecessors(task.id)))).all()
for dep_task in tasks:
if not dep_task.celery_task_uid or \
not AsyncResult(dep_task.celery_task_uid).state == SUCCESS:
return False
return True
@app.task(bind=True)
def run(self, workflow_id, cur_task_id=None):
print('Runnning Workflow {} and Task {}'.format(workflow_id, cur_task_id))
workflow = session.query(Workflow).filter_by(id=workflow_id).one()
graph = workflow.execution_graph
next_task_ids = []
if cur_task_id:
task = session.query(Task).get(cur_task_id)
if not _is_node_rdy(task, graph):
return
# do some business logic specific thing here
# if it's CI workflow, it would be eg. running tests
_process_task_node(task, self.request.id)
next_task_ids = list(graph.successors(cur_task_id))
else:
next_task_ids = find_entry_point(graph)
# prematurely update task's status here, because we should have it
# when the next task is running, and since it's a recursive call
# to run the next task, the status might not be updated automatically
# when we run it
self.update_state(state=SUCCESS)
for task_id in next_task_ids:
run.apply_async(
args=(workflow_id, task_id,),
queue=QUEUE_NAME
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment