Last active
April 29, 2018 13:48
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Python module for AWS Step Functions | |
""" | |
import copy | |
from functools import reduce | |
import json | |
def to_d(x): | |
if isinstance(x, SfnBase): | |
return x.to_dict() | |
elif isinstance(x, list): | |
return [to_d(y) for y in x] | |
elif isinstance(x, dict): | |
return {k: to_d(x[k]) for k in x if x[k] is not None} | |
return x | |
def merge_dict(d0, d1): | |
d = copy.copy(d0) | |
d.update(d1) | |
return d | |
class SfnBase(object): | |
def __init__(self): | |
pass | |
class StateMachine(SfnBase): | |
""" | |
Example: | |
>>> hwp = Pass("HelloWorld", Result="Hello World!", End=True) | |
>>> hwsm = StateMachine(StartAt=hwp.Name, Comment="A Hello World example of the Amazon States Language using a Pass state", States=[hwp]) | |
{'Comment': 'A Hello World example of the Amazon States Language using a Pass state', | |
'StartAt': 'HelloWorld', | |
'States': {'HelloWorld': {'End': True, | |
'Result': 'Hello World!', | |
'Type': 'Pass'}}} | |
""" | |
def __init__(self, StartAt, States, Comment=None, TimeoutSeconds=None, version=None): | |
self.StartAt = StartAt | |
self.Comment = Comment | |
self.States = States | |
self.TimeoutSeconds = TimeoutSeconds | |
self.Version = version | |
def to_dict(self): | |
d = vars(self) | |
res = {k: to_d(d[k]) for k in d if d[k] is not None} | |
res['States'] = reduce(merge_dict, res['States'], {}) | |
return res | |
def to_json(self, **kwargs): | |
return json.dumps(self.to_dict(), **kwargs) | |
# States | |
class SfnState(SfnBase): | |
def __init__(self, Next=None, End=None, Comment=None, InputPath=None, OutputPath=None): | |
self.Next = Next | |
self.End = End | |
# Optional | |
self.Comment = Comment | |
self.InputPath = InputPath | |
self.OutputPath = OutputPath | |
def _to_dict(self, name): | |
d = vars(self) | |
res = {k: to_d(d[k]) for k in d if d[k] is not None and k != 'Name'} | |
res['Type'] = self.__class__.__name__ | |
return {name: res} | |
class Pass(SfnState): | |
""" | |
Example: | |
>>> p = Pass('no-op', | |
Result={'x-datum': 0.38, 'y-datum': 622.22}, ResultPath='$.coords', Next='End') | |
>>> p.to_dict() | |
{'no-op': {'Next': 'End', | |
'Result': {'x-datum': 0.38, 'y-datum': 622.22}, | |
'ResultPath': '$.coords'}} | |
""" | |
def __init__(self, Name, Result=None, ResultPath=None, **kwargs): | |
super(Pass, self).__init__(**kwargs) | |
self.Name = Name | |
self.Result = Result | |
self.ResultPath = ResultPath | |
for k, v in kwargs.items(): | |
setattr(self, k, v) | |
def to_dict(self): | |
return super(Pass, self)._to_dict(self.Name) | |
class Task(SfnState): | |
""" | |
Example: | |
>>> t = Task('ActivityState', | |
Resource='arn:aws:states:us-east-1:123456789012:activity:HelloWorld', | |
TimeoutSeconds=300, HeartbeatSeconds=60, Next='NextState') | |
>>> t.to_dict() | |
{'ActivityState': {'HeartbeatSeconds': 60, | |
'Next': 'NextState', | |
'Resource': 'arn:aws:states:us-east-1:123456789012:activity:HelloWorld', | |
'TimeoutSeconds': 300, | |
'Type': 'Task'}} | |
""" | |
def __init__(self, Name, Resource, ResultPath=None, Retry=None, Catch=None, | |
TimeoutSeconds=None, HeartbeatSeconds=None, **kwargs): | |
super(Task, self).__init__(**kwargs) | |
self.Name = Name | |
self.Resource = Resource | |
self.ResultPath = ResultPath | |
self.Retry = Retry | |
self.Catch = Catch | |
self.TimeoutSeconds = TimeoutSeconds | |
self.HeartbeatSeconds = HeartbeatSeconds | |
def to_dict(self): | |
return super(Task, self)._to_dict(self.Name) | |
class Choice(SfnState): | |
""" | |
Example: | |
>>>c = Choice("ChoiceStateX", Choices=[ | |
{ | |
"Not": { | |
"Variable": "$.type", | |
"StringEquals": "Private" | |
}, | |
"Next": "Public" | |
}, | |
{ | |
"Variable": "$.value", | |
"NumericEquals": 0, | |
"Next": "ValueIsZero" | |
}, | |
{ | |
"And": [ | |
{ | |
"Variable": "$.value", | |
"NumericGreaterThanEquals": 20 | |
}, | |
{ | |
"Variable": "$.value", | |
"NumericLessThan": 30 | |
} | |
], | |
"Next": "ValueInTwenties" | |
} | |
], Default="DefaultState") | |
>>> c.to_dict() | |
{'ChoiceStateX': {'Choices': [{'Next': 'Public', | |
'Not': {'StringEquals': 'Private', 'Variable': '$.type'}}, | |
{'Next': 'ValueIsZero', 'NumericEquals': 0, 'Variable': '$.value'}, | |
{'And': [{'NumericGreaterThanEquals': 20, 'Variable': '$.value'}, | |
{'NumericLessThan': 30, 'Variable': '$.value'}], | |
'Next': 'ValueInTwenties'}], | |
'Default': 'DefaultState', | |
'Type': 'Choice'}} | |
""" | |
def __init__(self, Name, Choices, Default=None, **kwargs): | |
super(Choice, self).__init__(**kwargs) | |
self.Name = Name | |
self.Choices = Choices | |
self.Default = Default | |
def to_dict(self): | |
return super(Choice, self)._to_dict(self.Name) | |
class Wait(SfnState): | |
""" | |
>>> w = Wait("wait_ten_seconds", Seconds=10, Next="NextState") | |
>>> w | |
{'wait_ten_seconds': {'Next': 'NextState', 'Seconds': 10, 'Type': 'Wait'}} | |
""" | |
def __init__(self, Name, Seconds=None, Timestamp=None, SecondsPath=None, | |
TimestampPath=None, **kwargs): | |
super(Wait, self).__init__(**kwargs) | |
self.Name = Name | |
self.Seconds = Seconds | |
self.Timestamp = Timestamp | |
self.SecondsPath = SecondsPath | |
self.TimestampPath = TimestampPath | |
def to_dict(self): | |
return super(Wait, self)._to_dict(self.Name) | |
class Succeed(SfnState): | |
def __init__(self, Name): | |
self.Name = Name | |
def to_dict(self): | |
return super(Succeed, self)._to_dict(self.Name) | |
class Fail(SfnState): | |
""" | |
Example: | |
>>> f = Fail("FailState", Cause="Invalid response.", Error="ErrorA") | |
>>> f.to_dict() | |
{'FailState': {'Cause': 'Invalid response.', | |
'Error': 'ErrorA', | |
'Type': 'Fail'}} | |
""" | |
def __init__(self, Name, Cause=None, Error=None): | |
self.Name = Name | |
self.Cause = Cause | |
self.Error = Error | |
def to_dict(self): | |
return super(Fail, self)._to_dict(self.Name) | |
class Parallel(SfnState): | |
""" | |
Example: | |
>>> lua = Task('LookupAddress', Resource="arn:aws:lambda:us-east-1:123456789012:function:AddressFinder", End=True) | |
>>> lup = Task('LookupPhone', Resource="arn:aws:lambda:us-east-1:123456789012:function:PhoneFinder", End=True) | |
>>> sm0 = StateMachine(StartAt=lua.Name, States=[lua]) | |
>>> sm1 = StateMachine(StartAt=lup.Name, States=[lup]) | |
>>> p = Parallel('LookupCustomerInfo', Branches=[sm0, sm1], Next="NextState") | |
>>> p.to_dict() | |
{'LookupCustomerInfo': {'Branches': [{'StartAt': 'LookupAddress', | |
'States': {'LookupAddress': {'End': True, | |
'Resource': 'arn:aws:lambda:us-east-1:123456789012:function:AddressFinder', | |
'Type': 'Task'}}}, | |
{'StartAt': 'LookupPhone', | |
'States': {'LookupPhone': {'End': True, | |
'Resource': 'arn:aws:lambda:us-east-1:123456789012:function:PhoneFinder', | |
'Type': 'Task'}}}], | |
'Next': 'NextState', | |
'Type': 'Parallel'}} | |
""" | |
def __init__(self, Name, Branches, ResultPath=None, Retry=None, Catch=None, **kwargs): | |
super(Parallel, self).__init__(**kwargs) | |
self.Name = Name | |
self.Branches = Branches | |
self.ResultPath = ResultPath | |
self.Retry = Retry | |
self.Catch = Catch | |
def to_dict(self): | |
return super(Parallel, self)._to_dict(self.Name) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment