Skip to content

Instantly share code, notes, and snippets.

@alessandrobologna
Created April 20, 2017 20:44
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save alessandrobologna/d78f09c954f6126413c8e978fa8ceec9 to your computer and use it in GitHub Desktop.
Save alessandrobologna/d78f09c954f6126413c8e978fa8ceec9 to your computer and use it in GitHub Desktop.
import re
import time
from datetime import datetime, timedelta
import boto3
import json
import hashlib
from botocore.vendored import requests
SUCCESS = "SUCCESS"
FAILED = "FAILED"
SLEEP=10
TIMEOUT_THRESHOLD=20000
class TimeoutException(Exception):
pass
class FailedException(Exception):
pass
json.JSONEncoder.default = lambda self,obj: (obj.isoformat() if isinstance(obj, datetime) else None)
def dump(d):
return json.dumps(d)
def send(event, context, status, data, id, reason=None):
url = event['ResponseURL']
body = {}
body['Status'] = status
body['Reason'] = reason
body['PhysicalResourceId'] = id
body['StackId'] = event['StackId']
body['RequestId'] = event['RequestId']
body['LogicalResourceId'] = event['LogicalResourceId']
body['Data'] = data
json_body = dump(body)
print "Result:\n" + json_body
headers = {'content-type' : '', 'content-length' : str(len(json_body))}
try:
response = requests.put(url,data=json_body,headers=headers)
print "Status code: " + response.reason
except Exception as e:
print "send failed: " + str(e)
def lower(s):
""" boto3 likes lower case"""
return s[:1].lower() + s[1:] if s else s
def convert(k,s):
""" and it's picky about types"""
if k in ('containerPort','desiredCount','maximumPercent','minimumHealthyPercent'):
return (k, int(s))
if k in ('waitForMonitors',):
return (k, s.lower() in ('yes','true','sure','yup'))
return (k, s)
def transform(d):
""" recurse the dictionary to translate from CF lingo to boto3"""
if type(d) is dict:
return dict([(convert(lower(k), transform(v))) for k, v in d.items()])
elif type(d) is list:
return [transform(i) for i in d]
else:
return d
def describe(client,cluster,service, contex):
""" wrapper around boto3 describe services"""
try:
return client.describe_services(cluster=cluster,services=[service])['services'][0]
except (KeyError, TypeError, IndexError):
return None
except botocore.exceptions.ClientError as exception:
if "ThrottlingException" in str(exception):
if context.get_remaining_time_in_millis() < TIMEOUT_THRESHOLD:
raise TimeoutException("Timeout while waiting to call describe_services again")
time.sleep(SLEEP)
return describe(client,cluster,service, contex)
raise exception
def get_monitor_state(client, name):
if name:
response=client.describe_alarms(AlarmNames=[name]).get('MetricAlarms',None)
if response:
return response[0].get('StateValue',None)
return None
def wait_monitors(client, context, cluster, name, resource, data, event, max_time):
region=event['ServiceToken'].split(':')[3]
monitor_states = event.get('MonitorStates',{})
cw = boto3.client('cloudwatch', region)
print ("Waiting for monitors signaling")
while context.get_remaining_time_in_millis() > TIMEOUT_THRESHOLD:
for monitor, previous_state in monitor_states.iteritems():
current_state = get_monitor_state(cw, monitor)
print ("%s: %s -> %s" % (monitor,previous_state, current_state))
if current_state!=previous_state and current_state == 'ALARM':
return send(event, context, FAILED, data, resource, "%s failed waiting for monitor %s with state %s" % (event['RequestType'], monitor, current_state))
time.sleep(SLEEP)
response = describe(client,cluster,name,context)
data['Name'] = response.get('serviceName')
data['Status'] = response.get('events')[0].get('message',None)
return send(event, context, SUCCESS, data, resource, "%s successful" % event['RequestType'])
def wait_events(client, context, cluster, name, resource, data, event, max_time):
""" wait for 'steady state' in the message response or timeout"""
started_time=event.get('StartedTime',None)
if started_time and time.time() - started_time > max_time:
return send(event, context, FAILED, data, resource, ("%s failed after %s seconds elapsed)" % (event['RequestType'], max_time)))
event['Pending']="stabilize"
events = describe(client,cluster,name,context).get('events',[])[:1]
print ("Events:")
if events:
print (dump(events))
previous = events
while len(events)==0 or not 'steady state.' in events[0].get('message',None):
if context.get_remaining_time_in_millis() < TIMEOUT_THRESHOLD:
raise TimeoutException("Timeout waiting for steady state")
time.sleep(SLEEP)
events = describe(client,cluster,name,context).get('events',[])[:1]
if events and events != previous:
print (dump(events))
previous=events
if event.get('MonitorStates', None):
event['Pending']="monitoring"
raise TimeoutException("Waiting for monitors signal")
response = describe(client,cluster,name,context)
data['Name'] = response['serviceName']
data['Status'] = events[0].get('message',None)
return send(event, context, SUCCESS, data, resource, "%s successful" % event['RequestType'])
def wait_deployments(client, context, cluster, name, resource, data, event, max_time):
""" wait for 'steady state' in the message response or timeout"""
started_time=event.get('StartedTime',None)
event['Pending']="deployment"
deployments = describe(client,cluster,name,context).get('deployments',[])
previous = deployments
print ("Deployments:")
print (dump(deployments))
while not len(deployments)==1:
if context.get_remaining_time_in_millis() < TIMEOUT_THRESHOLD:
raise TimeoutException("Timeout waiting for completed deployment")
if started_time and time.time() - started_time > max_time:
raise FailedException("%s did not complete after %s seconds elapsed" % (event['RequestType'], max_time))
time.sleep(SLEEP)
deployments = describe(client,cluster,name,context).get('deployments',[])
if deployments != previous:
print (dump(deployments))
previous = deployments
return wait_events(client, context, cluster, name, resource, data, event, max_time)
def wait_inactive(client, context, cluster, name, resource, data, event, max_time):
""" wait for status to be inactive or timeout"""
started_time=event.get('StartedTime',None)
event['Pending']="deletion"
status = describe(client,cluster,name,context).get('status',None)
previous = status
print ("%s status: %s" % (name, status))
while not status == "INACTIVE":
if context.get_remaining_time_in_millis() < TIMEOUT_THRESHOLD:
raise TimeoutException("Timeout waiting for service to become inactive")
if started_time and time.time() - started_time > max_time:
raise FailedException("%s did not complete after %s seconds elapsed" % (event['RequestType'], max_time))
time.sleep(SLEEP)
try:
status = describe(client,cluster,name,context).get('status',None)
except botocore.exceptions.ClientError as exception:
if "ThrottlingException" in str(exception):
pass
else:
raise exception
if status != previous:
print ("%s status: %s" % (name, status))
previous = status
return send(event, context, SUCCESS, data, resource, "%s successful" % event['RequestType'])
def lambda_handler(event, context):
resource=event.get('PhysicalResourceId',None)
data = {}
try:
reason = ""
print ("Event:\n" + dump(event))
props=transform(event['ResourceProperties'])
name=props.get('serviceName',None)
max_time=props.get('maxTime',3600)
cluster=props.get('cluster',None)
taskdef=props.get('taskDefinition',None)
revision=taskdef.split(':')[-1:][0] if taskdef else "0"
family=re.compile('[:/]').split(taskdef)[6]
arn=':'.join(taskdef.split(':')[:-2])
resource=event.get('PhysicalResourceId',arn + ":service/" + name + ":" +revision)
stack_id=event.get('StackId', None)
logical_resource_id=event.get('LogicalResourceId',None)
region=event['ServiceToken'].split(':')[3]
monitors = props.pop("monitors",None)
wait_for_monitors = props.pop("waitForMonitors",False)
props.pop("serviceToken",None)
try:
if not name or not cluster:
raise Exception("Missing service or cluster name")
client = boto3.client('ecs', region)
cf = boto3.client('cloudformation', region)
# is this a continuation invocation for this lambda?
pending = event.get('Pending',None)
if pending=="deployment":
return wait_deployments(client, context, cluster, name, resource, data, event, max_time)
if pending=="stabilize":
return wait_events(client, context, cluster, name, resource, data, event, max_time)
if pending=="deletion":
return wait_inactive(client, context, cluster, name, resource, data, event, max_time)
if pending=="monitoring":
return wait_monitors(client, context, cluster, name, resource, data, event, max_time)
if event['RequestType'] == "Delete":
# need to distinguish between an actual delete and a cleanup
if stack_id and logical_resource_id:
response = cf.describe_stack_resource(StackName=stack_id,LogicalResourceId=logical_resource_id)
if resource == response.get('StackResourceDetail',{}).get('PhysicalResourceId',None):
# the resource id matches what is in cloudformation, it's an actual delete
response = describe(client,cluster,name,context)
if response and response['status']=='ACTIVE' and response['desiredCount']>0:
# shutdown tasks first
response = client.update_service(cluster=cluster,service=name,desiredCount=0)
response = client.delete_service(cluster=cluster,service=name)
print ("Delete response:\n" + dump(response))
event['StartedTime'] = time.time()
return wait_inactive(client, context, cluster, name, resource, data, event, max_time)
else:
print "Cleanup request for %s, will ignore it" % resource
return send(event, context, SUCCESS, data, resource, "%s successful" % event['RequestType'])
if event['RequestType'] == "Create":
props['clientToken'] = hashlib.md5(str(props)).hexdigest()
print ("Creating:\n" + dump(props))
print ("Create response:\n" + dump(client.create_service(**props)['service']))
if event['RequestType'] == "Update":
response = describe(client,cluster,name,context)
if response and response['status']!='ACTIVE':
raise Exception("Service not found or INACTIVE, cannot be updated")
props.pop('loadBalancers',None)
props.pop('role',None)
props.pop('placementConstraints', None)
props.pop('placementStrategy', None)
props["service"]=props.pop("serviceName")
print ("Updating:\n" + dump(props))
print ("Update response:\n" + dump(client.update_service(**props)['service']))
event['StartedTime'] = time.time()
if monitors and wait_for_monitors:
cw = boto3.client('cloudwatch', region)
states={}
for monitor in re.compile(',\s*').split(monitors):
state = get_monitor_state(cw, monitor)
if state:
states[monitor] = state
else:
print ("Monitor %s was not found" % monitor)
if states:
event['MonitorStates'] = states
return wait_deployments(client, context, cluster, name, resource, data, event, max_time)
except TimeoutException as e:
print "Passing the baton (%s)" % e
client = boto3.client('lambda',region)
client.invoke(FunctionName=event['ServiceToken'],InvocationType='Event', Payload=dump(event))
except Exception as e:
send(event, context, FAILED, data, resource, ("%s failed: Exception: (%s)" % (event['RequestType'], e)))
return
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment