Tutorial code copied from the Boto tutorial.
Created
February 24, 2015 19:07
-
-
Save xethorn/62695a072bb4f15726fd to your computer and use it in GitHub Desktop.
Garcon - Original Boto Example
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# serial_worker.py | |
import time | |
import boto.swf.layer2 as swf | |
class MyBaseWorker(swf.ActivityWorker): | |
domain = 'boto_tutorial' | |
version = '1.0' | |
task_list = None | |
def run(self): | |
activity_task = self.poll() | |
if 'activityId' in activity_task: | |
# Get input. | |
# Get the method for the requested activity. | |
try: | |
print 'working on activity from tasklist %s at %i' % (self.task_list, time.time()) | |
self.activity(activity_task.get('input')) | |
except Exception, error: | |
self.fail(reason=str(error)) | |
raise error | |
return True | |
def activity(self, activity_input): | |
raise NotImplementedError | |
class WorkerA(MyBaseWorker): | |
task_list = 'a_tasks' | |
def activity(self, activity_input): | |
self.complete(result="Now don't be givin him sambuca!") | |
class WorkerB(MyBaseWorker): | |
task_list = 'b_tasks' | |
def activity(self, activity_input): | |
self.complete() | |
class WorkerC(MyBaseWorker): | |
task_list = 'c_tasks' | |
def activity(self, activity_input): | |
self.complete() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# serial_decider.py | |
import time | |
import boto.swf.layer2 as swf | |
class SerialDecider(swf.Decider): | |
domain = 'boto_tutorial' | |
task_list = 'default_tasks' | |
version = '1.0' | |
def run(self): | |
history = self.poll() | |
if 'events' in history: | |
# Get a list of non-decision events to see what event came in last. | |
workflow_events = [e for e in history['events'] | |
if not e['eventType'].startswith('Decision')] | |
decisions = swf.Layer1Decisions() | |
# Record latest non-decision event. | |
last_event = workflow_events[-1] | |
last_event_type = last_event['eventType'] | |
if last_event_type == 'WorkflowExecutionStarted': | |
# Schedule the first activity. | |
decisions.schedule_activity_task('%s-%i' % ('ActivityA', time.time()), | |
'ActivityA', self.version, task_list='a_tasks') | |
elif last_event_type == 'ActivityTaskCompleted': | |
# Take decision based on the name of activity that has just completed. | |
# 1) Get activity's event id. | |
last_event_attrs = last_event['activityTaskCompletedEventAttributes'] | |
completed_activity_id = last_event_attrs['scheduledEventId'] - 1 | |
# 2) Extract its name. | |
activity_data = history['events'][completed_activity_id] | |
activity_attrs = activity_data['activityTaskScheduledEventAttributes'] | |
activity_name = activity_attrs['activityType']['name'] | |
# 3) Optionally, get the result from the activity. | |
result = last_event['activityTaskCompletedEventAttributes'].get('result') | |
# Take the decision. | |
if activity_name == 'ActivityA': | |
decisions.schedule_activity_task('%s-%i' % ('ActivityB', time.time()), | |
'ActivityB', self.version, task_list='b_tasks', input=result) | |
if activity_name == 'ActivityB': | |
decisions.schedule_activity_task('%s-%i' % ('ActivityC', time.time()), | |
'ActivityC', self.version, task_list='c_tasks', input=result) | |
elif activity_name == 'ActivityC': | |
# Final activity completed. We're done. | |
decisions.complete_workflow_execution() | |
self.complete(decisions=decisions) | |
return True |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment