Skip to content

Instantly share code, notes, and snippets.

@robcowie
Last active June 13, 2016 16:41
Show Gist options
  • Save robcowie/41e73cf69f5ed53b0dcf2798bb7ce142 to your computer and use it in GitHub Desktop.
Save robcowie/41e73cf69f5ed53b0dcf2798bb7ce142 to your computer and use it in GitHub Desktop.
import logging
import time
logger = logging.getLogger(__name__)
class NoSuchActivityError(Exception):
pass
def poller(status_getter, callback, interval=30):
"""Call `callback` until exception or return True.
This is rather basic but is probably ok for now.
"""
while True:
try:
status = status_getter()
except Exception:
# TODO: What do to about this? Retry?
logger.exception('Unable to get jobflow status')
raise
try:
if callback(status):
return
except Exception:
logger.exception('Error calling polling callback')
raise
time.sleep(interval)
import functools
from aws_utils.status import NoSuchActivityError, poll_for_status
import boto3
class NoSuchEMRCluster(Exception):
pass
class MultipleEMRClusters(Exception):
pass
def get_status(emr_name, region):
"""Connect to AWS and gets a status of a EMR cluster.
:param emr_name: (str) name of the cluster
:param region: (str) AWS region
:return: current status of a EMR
Possible return values : STARTING, BOOTSTRAPPING, RUNNING, WAITING, TERMINATING, TERMINATED, TERMINATED_WITH_ERRORS
"""
cl = boto3.client('emr', region)
emrs = cl.list_clusters()
emrs = [emr for emr in emrs['Clusters'] if emr['Name'] == emr_name]
if len(emrs) == 0:
raise NoSuchEMRCluster(u'No pipeline "{}"'.format(emr_name))
if len(emrs) > 1:
raise MultipleEMRClusters(u'Multiple EMR clusters called "{}"'.format(emr_name))
status = emr['Status']['State']
if not status:
raise NoSuchActivityError('The EMR cluster status is empty')
return status
def poll_for_status(emr_name, terminating_statuses, callback=None,
region='us-east-1', interval=30):
""""""
# combine callback and terminating_status
def cb(status):
if status in terminating_statuses:
return True
else:
return callback(status)
# Construct the status getter for the cluster
status_getter = functools.partial(get_status, emr_name, region)
return poll_for_status(status_getter, cb, interval=interval)
from aws_utils.status import NoSuchActivityError
import boto3
class NoSuchPipeline(Exception):
pass
class MultiplePipelines(Exception):
pass
def get_status(pipeline_name, region):
"""Connect to AWS and gets a status of a pipeline.
:param pipeline_name: (str) name of the Pipeline
:param region: (str) AWS region
:return: current status of a Pipeline
Possible return values : ACTIVATING, CANCELED, CASCADE_FAILED, DEACTIVATING, FAILED, FINISHED, INACTIVE, PAUSED,
PENDING, RUNNING, SHUTTING_DOWN, SKIPPED, TIMEDOUT, VALIDATING, WAITING_FOR_RUNNER, WAITING_ON_DEPENDENCIES
"""
cl = boto3.client('datapipeline', region) # 'us-east-1'
pipelines = cl.list_pipelines()
pipelines = [p for p in pipelines['pipelineIdList'] if p['name'] == pipeline_name]
if len(pipelines) == 0:
raise NoSuchPipeline(u'No pipeline "{}"'.format(pipeline_name))
if len(pipelines) > 1:
raise MultiplePipelines(u'Multiple pipelines called "{}"'.format(pipeline_name))
pipeline = pipelines[0]
pipeline_settings = cl.describe_pipelines(pipelineIds=[pipeline['id']])
fields = pipeline_settings['pipelineDescriptionList'][0]['fields']
status = [item['stringValue'] for item in fields if item['key'] == '@pipelineState']
if not status:
raise NoSuchActivityError('The pipeline status is empty')
return ''.join(status)
def poll_for_status(pipeline_name, terminating_statuses, callback=None,
region='us-east-1', interval=30):
""""""
# combine callback and terminating_status
def cb(status):
if status in terminating_statuses:
return True
else:
return callback(status)
# Construct the status getter for the pipeline
status_getter = functools.partial(get_status, emr_name, region)
return poll_for_status(status_getter, cb, interval=interval)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment