Last active
June 13, 2016 16:41
-
-
Save robcowie/41e73cf69f5ed53b0dcf2798bb7ce142 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import logging | |
import time | |
logger = logging.getLogger(__name__) | |
class NoSuchActivityError(Exception): | |
pass | |
def poller(status_getter, callback, interval=30): | |
"""Call `callback` until exception or return True. | |
This is rather basic but is probably ok for now. | |
""" | |
while True: | |
try: | |
status = status_getter() | |
except Exception: | |
# TODO: What do to about this? Retry? | |
logger.exception('Unable to get jobflow status') | |
raise | |
try: | |
if callback(status): | |
return | |
except Exception: | |
logger.exception('Error calling polling callback') | |
raise | |
time.sleep(interval) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import functools | |
from aws_utils.status import NoSuchActivityError, poll_for_status | |
import boto3 | |
class NoSuchEMRCluster(Exception): | |
pass | |
class MultipleEMRClusters(Exception): | |
pass | |
def get_status(emr_name, region): | |
"""Connect to AWS and gets a status of a EMR cluster. | |
:param emr_name: (str) name of the cluster | |
:param region: (str) AWS region | |
:return: current status of a EMR | |
Possible return values : STARTING, BOOTSTRAPPING, RUNNING, WAITING, TERMINATING, TERMINATED, TERMINATED_WITH_ERRORS | |
""" | |
cl = boto3.client('emr', region) | |
emrs = cl.list_clusters() | |
emrs = [emr for emr in emrs['Clusters'] if emr['Name'] == emr_name] | |
if len(emrs) == 0: | |
raise NoSuchEMRCluster(u'No pipeline "{}"'.format(emr_name)) | |
if len(emrs) > 1: | |
raise MultipleEMRClusters(u'Multiple EMR clusters called "{}"'.format(emr_name)) | |
status = emr['Status']['State'] | |
if not status: | |
raise NoSuchActivityError('The EMR cluster status is empty') | |
return status | |
def poll_for_status(emr_name, terminating_statuses, callback=None, | |
region='us-east-1', interval=30): | |
"""""" | |
# combine callback and terminating_status | |
def cb(status): | |
if status in terminating_statuses: | |
return True | |
else: | |
return callback(status) | |
# Construct the status getter for the cluster | |
status_getter = functools.partial(get_status, emr_name, region) | |
return poll_for_status(status_getter, cb, interval=interval) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from aws_utils.status import NoSuchActivityError | |
import boto3 | |
class NoSuchPipeline(Exception): | |
pass | |
class MultiplePipelines(Exception): | |
pass | |
def get_status(pipeline_name, region): | |
"""Connect to AWS and gets a status of a pipeline. | |
:param pipeline_name: (str) name of the Pipeline | |
:param region: (str) AWS region | |
:return: current status of a Pipeline | |
Possible return values : ACTIVATING, CANCELED, CASCADE_FAILED, DEACTIVATING, FAILED, FINISHED, INACTIVE, PAUSED, | |
PENDING, RUNNING, SHUTTING_DOWN, SKIPPED, TIMEDOUT, VALIDATING, WAITING_FOR_RUNNER, WAITING_ON_DEPENDENCIES | |
""" | |
cl = boto3.client('datapipeline', region) # 'us-east-1' | |
pipelines = cl.list_pipelines() | |
pipelines = [p for p in pipelines['pipelineIdList'] if p['name'] == pipeline_name] | |
if len(pipelines) == 0: | |
raise NoSuchPipeline(u'No pipeline "{}"'.format(pipeline_name)) | |
if len(pipelines) > 1: | |
raise MultiplePipelines(u'Multiple pipelines called "{}"'.format(pipeline_name)) | |
pipeline = pipelines[0] | |
pipeline_settings = cl.describe_pipelines(pipelineIds=[pipeline['id']]) | |
fields = pipeline_settings['pipelineDescriptionList'][0]['fields'] | |
status = [item['stringValue'] for item in fields if item['key'] == '@pipelineState'] | |
if not status: | |
raise NoSuchActivityError('The pipeline status is empty') | |
return ''.join(status) | |
def poll_for_status(pipeline_name, terminating_statuses, callback=None, | |
region='us-east-1', interval=30): | |
"""""" | |
# combine callback and terminating_status | |
def cb(status): | |
if status in terminating_statuses: | |
return True | |
else: | |
return callback(status) | |
# Construct the status getter for the pipeline | |
status_getter = functools.partial(get_status, emr_name, region) | |
return poll_for_status(status_getter, cb, interval=interval) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment