Skip to content

Instantly share code, notes, and snippets.

@Y4suyuki
Last active April 29, 2018 13:48
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Y4suyuki/a2e9951f8acb9fd07bff5b521c7aa8c0 to your computer and use it in GitHub Desktop.
Save Y4suyuki/a2e9951f8acb9fd07bff5b521c7aa8c0 to your computer and use it in GitHub Desktop.
"""
Python module for AWS Step Functions
"""
import copy
from functools import reduce
import json
def to_d(x):
if isinstance(x, SfnBase):
return x.to_dict()
elif isinstance(x, list):
return [to_d(y) for y in x]
elif isinstance(x, dict):
return {k: to_d(x[k]) for k in x if x[k] is not None}
return x
def merge_dict(d0, d1):
d = copy.copy(d0)
d.update(d1)
return d
class SfnBase(object):
def __init__(self):
pass
class StateMachine(SfnBase):
"""
Example:
>>> hwp = Pass("HelloWorld", Result="Hello World!", End=True)
>>> hwsm = StateMachine(StartAt=hwp.Name, Comment="A Hello World example of the Amazon States Language using a Pass state", States=[hwp])
{'Comment': 'A Hello World example of the Amazon States Language using a Pass state',
'StartAt': 'HelloWorld',
'States': {'HelloWorld': {'End': True,
'Result': 'Hello World!',
'Type': 'Pass'}}}
"""
def __init__(self, StartAt, States, Comment=None, TimeoutSeconds=None, version=None):
self.StartAt = StartAt
self.Comment = Comment
self.States = States
self.TimeoutSeconds = TimeoutSeconds
self.Version = version
def to_dict(self):
d = vars(self)
res = {k: to_d(d[k]) for k in d if d[k] is not None}
res['States'] = reduce(merge_dict, res['States'], {})
return res
def to_json(self, **kwargs):
return json.dumps(self.to_dict(), **kwargs)
# States
class SfnState(SfnBase):
def __init__(self, Next=None, End=None, Comment=None, InputPath=None, OutputPath=None):
self.Next = Next
self.End = End
# Optional
self.Comment = Comment
self.InputPath = InputPath
self.OutputPath = OutputPath
def _to_dict(self, name):
d = vars(self)
res = {k: to_d(d[k]) for k in d if d[k] is not None and k != 'Name'}
res['Type'] = self.__class__.__name__
return {name: res}
class Pass(SfnState):
"""
Example:
>>> p = Pass('no-op',
Result={'x-datum': 0.38, 'y-datum': 622.22}, ResultPath='$.coords', Next='End')
>>> p.to_dict()
{'no-op': {'Next': 'End',
'Result': {'x-datum': 0.38, 'y-datum': 622.22},
'ResultPath': '$.coords'}}
"""
def __init__(self, Name, Result=None, ResultPath=None, **kwargs):
super(Pass, self).__init__(**kwargs)
self.Name = Name
self.Result = Result
self.ResultPath = ResultPath
for k, v in kwargs.items():
setattr(self, k, v)
def to_dict(self):
return super(Pass, self)._to_dict(self.Name)
class Task(SfnState):
"""
Example:
>>> t = Task('ActivityState',
Resource='arn:aws:states:us-east-1:123456789012:activity:HelloWorld',
TimeoutSeconds=300, HeartbeatSeconds=60, Next='NextState')
>>> t.to_dict()
{'ActivityState': {'HeartbeatSeconds': 60,
'Next': 'NextState',
'Resource': 'arn:aws:states:us-east-1:123456789012:activity:HelloWorld',
'TimeoutSeconds': 300,
'Type': 'Task'}}
"""
def __init__(self, Name, Resource, ResultPath=None, Retry=None, Catch=None,
TimeoutSeconds=None, HeartbeatSeconds=None, **kwargs):
super(Task, self).__init__(**kwargs)
self.Name = Name
self.Resource = Resource
self.ResultPath = ResultPath
self.Retry = Retry
self.Catch = Catch
self.TimeoutSeconds = TimeoutSeconds
self.HeartbeatSeconds = HeartbeatSeconds
def to_dict(self):
return super(Task, self)._to_dict(self.Name)
class Choice(SfnState):
"""
Example:
>>>c = Choice("ChoiceStateX", Choices=[
{
"Not": {
"Variable": "$.type",
"StringEquals": "Private"
},
"Next": "Public"
},
{
"Variable": "$.value",
"NumericEquals": 0,
"Next": "ValueIsZero"
},
{
"And": [
{
"Variable": "$.value",
"NumericGreaterThanEquals": 20
},
{
"Variable": "$.value",
"NumericLessThan": 30
}
],
"Next": "ValueInTwenties"
}
], Default="DefaultState")
>>> c.to_dict()
{'ChoiceStateX': {'Choices': [{'Next': 'Public',
'Not': {'StringEquals': 'Private', 'Variable': '$.type'}},
{'Next': 'ValueIsZero', 'NumericEquals': 0, 'Variable': '$.value'},
{'And': [{'NumericGreaterThanEquals': 20, 'Variable': '$.value'},
{'NumericLessThan': 30, 'Variable': '$.value'}],
'Next': 'ValueInTwenties'}],
'Default': 'DefaultState',
'Type': 'Choice'}}
"""
def __init__(self, Name, Choices, Default=None, **kwargs):
super(Choice, self).__init__(**kwargs)
self.Name = Name
self.Choices = Choices
self.Default = Default
def to_dict(self):
return super(Choice, self)._to_dict(self.Name)
class Wait(SfnState):
"""
>>> w = Wait("wait_ten_seconds", Seconds=10, Next="NextState")
>>> w
{'wait_ten_seconds': {'Next': 'NextState', 'Seconds': 10, 'Type': 'Wait'}}
"""
def __init__(self, Name, Seconds=None, Timestamp=None, SecondsPath=None,
TimestampPath=None, **kwargs):
super(Wait, self).__init__(**kwargs)
self.Name = Name
self.Seconds = Seconds
self.Timestamp = Timestamp
self.SecondsPath = SecondsPath
self.TimestampPath = TimestampPath
def to_dict(self):
return super(Wait, self)._to_dict(self.Name)
class Succeed(SfnState):
def __init__(self, Name):
self.Name = Name
def to_dict(self):
return super(Succeed, self)._to_dict(self.Name)
class Fail(SfnState):
"""
Example:
>>> f = Fail("FailState", Cause="Invalid response.", Error="ErrorA")
>>> f.to_dict()
{'FailState': {'Cause': 'Invalid response.',
'Error': 'ErrorA',
'Type': 'Fail'}}
"""
def __init__(self, Name, Cause=None, Error=None):
self.Name = Name
self.Cause = Cause
self.Error = Error
def to_dict(self):
return super(Fail, self)._to_dict(self.Name)
class Parallel(SfnState):
"""
Example:
>>> lua = Task('LookupAddress', Resource="arn:aws:lambda:us-east-1:123456789012:function:AddressFinder", End=True)
>>> lup = Task('LookupPhone', Resource="arn:aws:lambda:us-east-1:123456789012:function:PhoneFinder", End=True)
>>> sm0 = StateMachine(StartAt=lua.Name, States=[lua])
>>> sm1 = StateMachine(StartAt=lup.Name, States=[lup])
>>> p = Parallel('LookupCustomerInfo', Branches=[sm0, sm1], Next="NextState")
>>> p.to_dict()
{'LookupCustomerInfo': {'Branches': [{'StartAt': 'LookupAddress',
'States': {'LookupAddress': {'End': True,
'Resource': 'arn:aws:lambda:us-east-1:123456789012:function:AddressFinder',
'Type': 'Task'}}},
{'StartAt': 'LookupPhone',
'States': {'LookupPhone': {'End': True,
'Resource': 'arn:aws:lambda:us-east-1:123456789012:function:PhoneFinder',
'Type': 'Task'}}}],
'Next': 'NextState',
'Type': 'Parallel'}}
"""
def __init__(self, Name, Branches, ResultPath=None, Retry=None, Catch=None, **kwargs):
super(Parallel, self).__init__(**kwargs)
self.Name = Name
self.Branches = Branches
self.ResultPath = ResultPath
self.Retry = Retry
self.Catch = Catch
def to_dict(self):
return super(Parallel, self)._to_dict(self.Name)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment