Skip to content

Instantly share code, notes, and snippets.

@velp
Last active August 20, 2018 10:48
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save velp/20ca56935f7bf0a9685c45b91614daad to your computer and use it in GitHub Desktop.
Save velp/20ca56935f7bf0a9685c45b91614daad to your computer and use it in GitHub Desktop.
Example of use 'taskflow' module
# python ./taskflow-example.py
call AllocateSubnet.execute(client=some_client)
call CreateSubnet.execute(client=new_client, subnet_id={'id': '111222333'})
call PrintSubnet.execute(client=some_client, subnet={'subnet': {'id': {'id': '111222333'}, 'name': 'test'}})
revert PrintSubnet task with kwargs={'subnet': {'subnet': {'id': {'id': '111222333'}, 'name': 'test'}}, 'result': None, 'flow_failures': {'__main__.ErrorTask': <taskflow.types.failure.Failure object at 0x7f07b4eac910>}}
revert CreateSubnet task with kwargs={'result': {'subnet': {'id': {'id': '111222333'}, 'name': 'test'}}, 'flow_failures': {'__main__.ErrorTask': <taskflow.types.failure.Failure object at 0x7f07b4eac910>}}
revert AllocateSubnet task with kwargs={'result': {'id': '111222333'}, 'flow_failures': {'__main__.ErrorTask': <taskflow.types.failure.Failure object at 0x7f07b4eac910>}}
Traceback (most recent call last):
  File "./test-task-flow.py", line 61, in <module>
    flow_engine.run()
  File "/usr/local/lib/python2.7/site-packages/taskflow/engines/action_engine/engine.py", line 247, in run
    for _state in self.run_iter(timeout=timeout):
  File "/usr/local/lib/python2.7/site-packages/taskflow/engines/action_engine/engine.py", line 340, in run_iter
    failure.Failure.reraise_if_any(er_failures)
  File "/usr/local/lib/python2.7/site-packages/taskflow/types/failure.py", line 339, in reraise_if_any
    failures[0].reraise()
  File "/usr/local/lib/python2.7/site-packages/taskflow/types/failure.py", line 346, in reraise
    six.reraise(*self._exc_info)
  File "/usr/local/lib/python2.7/site-packages/taskflow/engines/action_engine/executor.py", line 53, in _execute_task
    result = task.execute(**arguments)
  File "./test-task-flow.py", line 44, in execute
    raise Exception("test raise")
Exception: test raise
from taskflow.task import Task
from taskflow.patterns.linear_flow import Flow as LinearFlow
from taskflow.engines import load
class AllocateSubnet(Task):
default_provides = 'subnet_id'
def execute(self, client):
print("call AllocateSubnet.execute(client=%s)" % client)
return {"id": "111222333"}
def revert(self, **kwargs):
print("revert AllocateSubnet task with kwargs=%s" % kwargs)
class CreateSubnet(Task):
default_provides = 'subnet_obj'
def execute(self, client, subnet_id):
print("call CreateSubnet.execute(client=%s, subnet_id=%s)"
% (client, subnet_id))
return {"subnet": {"id": subnet_id, "name": "test"}}
def revert(self, **kwargs):
print("revert CreateSubnet task with kwargs=%s" % kwargs)
class PrintSubnet(Task):
def execute(self, client, subnet):
print("call PrintSubnet.execute(client=%s, subnet=%s)"
% (client, subnet))
def revert(self, **kwargs):
print("revert PrintSubnet task with kwargs=%s" % kwargs)
class ErrorTask(Task):
def execute(self, client):
raise Exception("test raise")
def create_subnet_flow():
flow = LinearFlow("Test flow 1")
flow.add(AllocateSubnet())
# change data in argument 'client' to 'new_client' for this task
flow.add(CreateSubnet(inject={"client": "new_client"}))
# change requeres task's argument from 'subnet' to 'subnet_obj'
flow.add(PrintSubnet(rebind={"subnet": "subnet_obj"}))
# example raise error
flow.add(ErrorTask())
return load(flow, store=dict(client="some_client"))
if __name__ == "__main__":
flow_engine = create_subnet_flow()
flow_engine.run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment