Skip to content

Instantly share code, notes, and snippets.

@xethorn
Created February 24, 2015 19:07
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save xethorn/62695a072bb4f15726fd to your computer and use it in GitHub Desktop.
Save xethorn/62695a072bb4f15726fd to your computer and use it in GitHub Desktop.
Garcon - Original Boto Example
# serial_worker.py
import time
import boto.swf.layer2 as swf
class MyBaseWorker(swf.ActivityWorker):
domain = 'boto_tutorial'
version = '1.0'
task_list = None
def run(self):
activity_task = self.poll()
if 'activityId' in activity_task:
# Get input.
# Get the method for the requested activity.
try:
print 'working on activity from tasklist %s at %i' % (self.task_list, time.time())
self.activity(activity_task.get('input'))
except Exception, error:
self.fail(reason=str(error))
raise error
return True
def activity(self, activity_input):
raise NotImplementedError
class WorkerA(MyBaseWorker):
task_list = 'a_tasks'
def activity(self, activity_input):
self.complete(result="Now don't be givin him sambuca!")
class WorkerB(MyBaseWorker):
task_list = 'b_tasks'
def activity(self, activity_input):
self.complete()
class WorkerC(MyBaseWorker):
task_list = 'c_tasks'
def activity(self, activity_input):
self.complete()
# serial_decider.py
import time
import boto.swf.layer2 as swf
class SerialDecider(swf.Decider):
domain = 'boto_tutorial'
task_list = 'default_tasks'
version = '1.0'
def run(self):
history = self.poll()
if 'events' in history:
# Get a list of non-decision events to see what event came in last.
workflow_events = [e for e in history['events']
if not e['eventType'].startswith('Decision')]
decisions = swf.Layer1Decisions()
# Record latest non-decision event.
last_event = workflow_events[-1]
last_event_type = last_event['eventType']
if last_event_type == 'WorkflowExecutionStarted':
# Schedule the first activity.
decisions.schedule_activity_task('%s-%i' % ('ActivityA', time.time()),
'ActivityA', self.version, task_list='a_tasks')
elif last_event_type == 'ActivityTaskCompleted':
# Take decision based on the name of activity that has just completed.
# 1) Get activity's event id.
last_event_attrs = last_event['activityTaskCompletedEventAttributes']
completed_activity_id = last_event_attrs['scheduledEventId'] - 1
# 2) Extract its name.
activity_data = history['events'][completed_activity_id]
activity_attrs = activity_data['activityTaskScheduledEventAttributes']
activity_name = activity_attrs['activityType']['name']
# 3) Optionally, get the result from the activity.
result = last_event['activityTaskCompletedEventAttributes'].get('result')
# Take the decision.
if activity_name == 'ActivityA':
decisions.schedule_activity_task('%s-%i' % ('ActivityB', time.time()),
'ActivityB', self.version, task_list='b_tasks', input=result)
if activity_name == 'ActivityB':
decisions.schedule_activity_task('%s-%i' % ('ActivityC', time.time()),
'ActivityC', self.version, task_list='c_tasks', input=result)
elif activity_name == 'ActivityC':
# Final activity completed. We're done.
decisions.complete_workflow_execution()
self.complete(decisions=decisions)
return True
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment