Skip to content

Instantly share code, notes, and snippets.

@JesseCrocker
Last active June 13, 2022 23:53
Embed
What would you like to do?
Simple ECS service deploy script

Deploy updates to aws ecs services based on github commits. Posts deploy notifications to slack.

Assumptions:

  • Task definitions map 1:1 with services, and they have the same name.
  • Code is stored on github
  • You want to deploy the latest commit in a branch
  • Docker images are tagged with commit SHA
  • Docker images are stored on AWS ECR
#!/usr/bin/env python3
import boto3
import logging
from optparse import OptionParser
import requests
import json
import sys
import time
import os
import traceback
AWS_REGION="us-east-1"
DEFAULT_CLUSTER = "XXX"
GITHUB_AUTH = ("XXX", "XXX")
ECR_ID = "XXX"
ECR_REPO = "XXX"
GITHUB_REPO = "trailbehind/XXX"
SLACK_WEBHOOK = "https://hooks.slack.com/services/XXX"
SERVICES = {
"production": {
"service_name": "routing-production",
"branch": "master"
},
"staging": {
"service_name": "routing-staging",
"branch": "staging"
},
}
def get_latest_commit(branch, repo=None):
""" Get the sha of the latest commit on a branch by querying github """
url = "https://api.github.com/repos/%s/git/refs/heads/%s" % (repo, branch)
req = requests.get(url, auth=GITHUB_AUTH)
if req.status_code != 200:
raise Exception("Failed to get github latest commit: " + url)
response = req.json()
sha = response['object']['sha']
return sha
def get_commit_message(repo, sha):
url = "https://api.github.com/repos/%s/commits/%s" % (repo, sha)
req = requests.get(url, auth=GITHUB_AUTH)
if req.status_code != 200:
raise Exception("Failed to get github commit message: " + url)
response = req.json()
return response['commit']['message']
def update_task_definition(client, task_name, new_tag):
logging.info("Updating task definition %s to image %s" % (task_name, new_tag))
logging.debug("Fetching task definition for " + task_name)
response = client.describe_task_definition(
taskDefinition=task_name
)
existing_task_definition = response['taskDefinition']
logging.debug("existing task definition: " + json.dumps(existing_task_definition, indent=4, sort_keys=True))
container_definitions = existing_task_definition['containerDefinitions']
for c in container_definitions:
image, old_tag = c['image'].split(":")
c['image'] = image + ":" + new_tag
task_definition = {
'containerDefinitions': container_definitions,
}
#keys that need a blank value if one isnt specified
for key in ('taskRoleArn',):
task_definition[key] = existing_task_definition.get(key, '')
#keys to copy if they exist
for key in ('cpu', 'executionRoleArn', 'family', 'memory', 'networkMode', 'placementConstraints', 'requiresCompatibilities', 'volumes'):
if key in existing_task_definition:
task_definition[key] = existing_task_definition[key]
else:
logging.debug("key %s not found in task definition" % (key,))
logging.debug("new task definition: " + json.dumps(task_definition, indent=4, sort_keys=True))
response = client.register_task_definition(**task_definition)
logging.debug("Finished updating task definition " + task_name)
logging_config = container_definitions[0]['logConfiguration']['options']
log_prefix = logging_config['awslogs-stream-prefix'] + "/" + container_definitions[0]['name']
return response['taskDefinition']['taskDefinitionArn'], logging_config['awslogs-group'], log_prefix
def update_service(client, cluster, service_name, task_definition_arn):
service_definitions = client.describe_services(
cluster=cluster,
services=[service_name]
)
if service_definitions is None or len(service_definitions['services']) == 0:
raise Exception("Service %s not found" % service_name)
existing_service_definition = service_definitions['services'][0]
logging.debug("existing service definition" + json.dumps(existing_service_definition, default=str))
response = client.update_service(
cluster=existing_service_definition['clusterArn'],
service=service_name,
desiredCount=existing_service_definition['desiredCount'],
taskDefinition=task_definition_arn,
deploymentConfiguration=existing_service_definition['deploymentConfiguration']
)
## Monitoring task status
def wait_for_service_deploy_to_finish(client, cluster, service_name, sleep=15):
''' Wait for all instances of a service to update to the current version '''
old_service_count = get_service_status_count(client, cluster, service_name).get('ACTIVE', 0)
while old_service_count > 0:
logging.info("Waiting for deploy of %s to finish, %i old deployments are still active" %
(service_name, old_service_count))
time.sleep(sleep)
old_service_count = get_service_status_count(client, cluster, service_name).get('ACTIVE', 0)
def get_service_status_count(client, cluster, service_name):
service_definitions = client.describe_services(
cluster=cluster,
services=[service_name]
)
if service_definitions is None or len(service_definitions['services']) == 0:
raise Exception("Service %s not found" % service_name)
existing_service_definition = service_definitions['services'][0]
statuses = {}
for deployment in existing_service_definition['deployments']:
status = deployment['status']
if status == "STOPPED":
continue
statuses[status] = statuses.get(status, 0) + deployment['runningCount']
return statuses
## ECS agent
def update_cluster_ecs_agent(cluster, next_token=None):
client = boto3.client('ecs', region_name=AWS_REGION)
if next_token is None:
response = client.list_container_instances(
cluster=cluster,
status='ACTIVE'
)
else:
response = client.list_container_instances(
cluster=cluster,
status='ACTIVE',
nextToken=next_token
)
for arn in response['containerInstanceArns']:
update_instance_ecs_agent(client, cluster, arn)
if 'nextToken' in response and response['nextToken'] is not None:
update_cluster_ecs_agent(client, cluster, next_token=response['nextToken'])
def update_instance_ecs_agent(client, cluster, arn):
try:
response = client.update_container_agent(cluster=cluster, containerInstance=arn)
logging.info("updating instance " + arn)
except:
logging.info("No update available for instance " + arn)
## ECR
def verify_image_exists(registryId, repositoryName, tag):
client = boto3.client('ecr', region_name=AWS_REGION)
try:
response = client.describe_images(
registryId=registryId,
repositoryName=repositoryName,
imageIds=[
{
'imageTag': tag
},
]
)
except Exception:
return False
return len(response['imageDetails']) > 0
# Fallback service
def deploy(docker_tag="master", cluster=None, service_name=None, wait=True, username=None, dry_run=False):
if not verify_image_exists(ECR_ID, ECR_REPO, docker_tag):
raise Exception("Error, docker image with tag %s not found" % docker_tag)
try:
commit_text = get_commit_message(GITHUB_REPO, docker_tag)
except:
logging.exception("Failed to get commit text")
commit_text = "unknown commit text"
_notify_slack("%s is Deploying <https://github.com/%s/commit/%s|'%s'> to %s" %
(username, GITHUB_REPO, docker_tag, commit_text.replace("\n", ". "), service_name), dry_run=dry_run)
client = boto3.client('ecs', region_name=AWS_REGION)
task_definition_arn, log_group, log_prefix = update_task_definition(client, service_name, docker_tag)
logging.info("Updating " + service_name)
if not dry_run:
update_service(client, cluster, service_name, task_definition_arn)
if wait:
wait_for_service_deploy_to_finish(client, cluster, service_name)
logging.info("Updating %s finished" % service_name)
_notify_slack("Deploying %s finished" % (service_name,), dry_run=dry_run)
## Notifying other services
def _notify_slack(message, dry_run=False):
if dry_run:
logging.info("SLACK:" + message)
return
try:
r = requests.post(SLACK_WEBHOOK, json={"text":message})
except:
logging.error("error notifying slack")
def _main():
usage = "usage: %prog"
parser = OptionParser(usage=usage,
description="")
parser.add_option("-d", "--debug", action="store_true", dest="debug",
help="Turn on debug logging")
parser.add_option("-q", "--quiet", action="store_true", dest="quiet",
help="turn off all logging")
parser.add_option("-c", "--commit", action="store", dest="commit",
help="deploy specified commit instead of latest commit")
parser.add_option("-b", "--branch", action="store", dest="branch",
help="deploy latest commit from specified branch instead of default branch")
parser.add_option("-D", "--dry-run", action="store_true", dest="dry_run",
help="Dry run. Don't really deploy.")
parser.add_option("-n", "--no-wait", action="store_true", dest="no_wait",
help="Dont wait for services to restart")
(options, args) = parser.parse_args()
logging.basicConfig(level=logging.DEBUG if options.debug else
(logging.ERROR if options.quiet else logging.INFO),
format='DEPLOY %(asctime)s - %(message)s', datefmt="%H:%M:%S")
logging.getLogger('botocore.vendored.requests.packages.urllib3.connectionpool').setLevel(logging.WARNING)
logging.getLogger('requests.packages.urllib3.connectionpool').setLevel(logging.WARNING)
logging.getLogger('botocore.credentials').setLevel(logging.WARNING)
if len(args) != 1:
logging.error("Error, expecting arg either agent or a service: " + ", ".join(SERVICES.keys()))
sys.exit(-1)
cluster = DEFAULT_CLUSTER
if args[0] == "agent":
_notify_slack("Updating ECS agent")
update_cluster_ecs_agent(cluster)
return
elif args[0] in SERVICES:
service_name = SERVICES[args[0]]['service_name']
branch = SERVICES[args[0]]['branch']
if 'cluster' in SERVICES[args[0]]:
cluster = SERVICES[args[0]]['cluster']
else:
logging.error("Error, expecting arg either production or staging")
sys.exit(-1)
if options.branch:
branch = options.branch
if options.commit:
docker_tag = options.commit
else:
docker_tag = get_latest_commit(branch, repo=GITHUB_REPO)
try:
deploy(
service_name=service_name,
docker_tag=docker_tag,
cluster=cluster,
dry_run=options.dry_run,
wait=not options.no_wait,
username=os.getenv("USERNAME", os.getenv("USER")),
)
except Exception as e:
_notify_slack("Error deploying:\n" + traceback.format_exc())
logging.exception("Error deploying")
if __name__ == "__main__":
_main()
@JesseCrocker
Copy link
Author

Now updated to work with fargate tasks

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment