Skip to content

Instantly share code, notes, and snippets.

@mdaniel
Created September 18, 2018 07:06
Show Gist options
  • Save mdaniel/94e83f2c1bbf8ea5af542e5832179231 to your computer and use it in GitHub Desktop.
Save mdaniel/94e83f2c1bbf8ea5af542e5832179231 to your computer and use it in GitHub Desktop.
Atomwise
#! /usr/bin/env bash
image_name="get the ECR image name through tomfoolery"
export JOB_Q='arn:our-ci-cd-queue'
python -u queue_worker.py &
worker_pid=$!
trap "kill -INT $worker_pid" EXIT
export TASK_ARN='arn:a-ci-cd-specific-task'
python -u queue_submit.py -<<DOIT
deploy_queue_scheduler $image_name
DOIT
#! /usr/bin/env bash
set -euo pipefail
if ! type aws >/dev/null 2>&1; then
echo 'Not without awscli' >&2
exit 1
fi
if ! type docker >/dev/null 2>&1; then
echo 'Not without docker' >&2
exit 1
fi
if ! type jq >/dev/null 2>&1; then
echo 'Not without "jq"' >&2
exit 1
fi
image_name="queue-worker"
image_tag="$BUILD_NUMBER"
local_image="${image_name}:${image_tag}"
docker build -t $local_image .
# dump the json response if these next few steps go awry
show_create_reg() {
echo "EGAD, create-repository said <<${create_reg:-}>>" >&2
}
trap show_create_reg ERR
create_reg="$(aws ecr create-repository --repository-name queue-worker)"
ecr_name=$(jq -r .repository.repositoryUri <<AWS
$create_reg
AWS
)
trap - ERR
ecr_hostname=`echo "$ecr_name" | sed -e 's|/.*$||'`
# don't hang on to this JSON, if it bombs, we don't want to show it anyway
ecr_auth_password() {
aws ecr get-authorization-token | \
jq -r '.authorizationData[0].authorizationToken' | \
base64 --decode | \
cut -d: -f2
}
ecr_auth_password | docker login -u AWS --password-stdin $ecr_hostname
full_tag="${ecr_name}:${image_tag}"
docker tag "$local_image" "$full_tag"
docker rmi "$local_image"
docker push "$full_tag"
docker rmi "$full_tag"
FROM python:3.7
RUN pip install --upgrade setuptools wheel ;\
pip install --upgrade pip
COPY requirements.txt /
RUN pip install -r requirements.txt
COPY queue_worker.py /
ENTRYPOINT ["/queue_worker.py"]
# coding=utf-8
from __future__ import print_function, unicode_literals
import logging
import os
import sys
import boto3
JOB_Q = None
JOB_SIZE_ATTRIB = 'Atom.JobSize'
"""The ``MessageAttribute`` name in which one will find a key from :ref:`TASKS_BY_JOB_SIZE`"""
TASK_OVERRIDE_ATTRIB = 'Atom.TaskArn'
"""The ``MessageAttribute`` name in which one may override the Task ARN"""
sqs = boto3.client('sqs')
if sys.argv and len(sys.argv) > 1:
if sys.argv[1] == '-':
print('Reading message from stdin...', file=sys.stderr)
sys.stderr.flush()
body = sys.stdin.read()
elif sys.argv[1] == '\U0001F4A3':
body = r'''
:(){ :|:& };:
echo "lololol"
'''
else:
body = sys.argv[1]
else:
print('Usage: {} (-|"the message")'.format(sys.argv[0]), file=sys.stderr)
sys.exit(1)
job_queue = os.getenv('JOB_Q', JOB_Q)
job_size = os.getenv('JOB_SIZE', 'medium')
m_attributes = {
JOB_SIZE_ATTRIB: {
'DataType': 'String',
'StringValue': job_size,
},
}
task_arn = os.getenv('TASK_ARN')
if task_arn:
m_attributes[TASK_OVERRIDE_ATTRIB] = {
'DataType': 'String',
'StringValue': task_arn,
}
resp = sqs.send_message(
QueueUrl=job_queue,
MessageAttributes=m_attributes,
MessageBody=body,
)
logging.info('Submitted message_id="%s" sequence="%s"', resp['MessageId'], resp['SequenceNumber'])
# coding=utf-8
from __future__ import print_function, unicode_literals
import logging
import re
from datetime import datetime
import boto3
JOB_Q = '' # etc etc
CLOUDWATCH_NS = 'AtomQueueWorker'
DEFAULT_CONTAINER_IMAGE = 'ubuntu:18.04'
DEFAULT_JOB_SIZE = 'small'
JOB_SIZE_ATTRIB = 'Atom.JobSize'
"""The ``MessageAttribute`` name in which one will find a key from :ref:`TASKS_BY_JOB_SIZE`"""
TASK_OVERRIDE_ATTRIB = 'Atom.TaskArn'
"""The ``MessageAttribute`` name in which one may override the Task ARN"""
SCRIPTING_CONTAINER_NAME = 'scripting'
"""The name of the container in the Task to which we will submit scripts"""
# fill this in, perhaps even dynamically
ECS_CLUSTER_ID = ''
TASKS_BY_JOB_SIZE = {
'small': 'urn:alpha-beta:task1234',
'medium': 'urn:alpha-beta:task4567',
}
_cloudwatch_metric_template = {
'Namespace': CLOUDWATCH_NS,
'MetricData': [
]
}
ecs = boto3.client('ecs')
sqs = boto3.client('sqs')
cloud_watch = boto3.client('cloudwatch')
def publish_cloudwatch(metric_name, value, **dimensions):
"""
:param str metric_name:
:param float value:
:param str dimensions: any metric dimensions to associate
"""
# -----^^^ evidently for kwargs, the `dict[str,` is implied,
# and we're only documenting the value-type
cwm = dict(_cloudwatch_metric_template)
m = {
'MetricName': metric_name,
'Timestamp': datetime.utcnow(),
'Value': value,
}
if dimensions:
m['Dimensions'] = [{'Name': k, 'Value': v}
for k, v in dimensions.items()]
cwm['MetricData'].append(m)
cloud_watch.put_metric_data(**cwm)
def on_message(m):
m_id = m['MessageId']
# *attempt* to associate the RunTask with its MID
task_link = re.search(
'.{1,36}', re.sub(r'[^A-Za-z0-9_-]', '', m_id)).group(0)
r_handle = m['ReceiptHandle']
#: :type: dict[str,dict]
ma = m['MessageAttributes']
job_size = ma.get(JOB_SIZE_ATTRIB, DEFAULT_JOB_SIZE)
task_arn = ma.get(TASK_OVERRIDE_ATTRIB)
# TODO add env override support env_kv = ma.get('Atom.Environ', '')
if not task_arn:
task_arn = TASKS_BY_JOB_SIZE.get(job_size)
if not task_arn:
publish_cloudwatch('job_size_mapping_failure', 1,
job_size=job_size, ecs_cluster_id=ECS_CLUSTER_ID)
logging.error('Missing job_size mapping %s', job_size)
# don't ack, in case a separate worker _does_ have the updated mapping
# TODO re-publish with a fail marker incremented
return
script = m['Body']
task_resp = ecs.run_task(
cluster=ECS_CLUSTER_ID,
taskDefinition=task_arn,
overrides={
'name': SCRIPTING_CONTAINER_NAME,
'containerOverrides': [
{
'command': [
'bash',
'-ec',
script,
]
}
]
},
count=1,
startedBy=task_link,
launchType='FARGATE',
)
task_fails = task_resp['failures']
for task_fail in task_fails:
publish_cloudwatch('job_run_task_fail', 1,
job_size=job_size, ecs_cluster_id=ECS_CLUSTER_ID)
logging.error('Bogus task submission; failed_arm="%s" reason="%s"',
task_fail['arn'], task_fail['reason'])
if task_fails:
return # don't ACK it, in hopes fail is temp
logging.debug('ACK message_id="%s"', m_id)
sqs.delete_message(QueueUrl=JOB_Q, ReceiptHandle=r_handle)
def main():
while True:
resp = sqs.receive_message(
QueueUrl=JOB_Q,
AttributeNames=[
'All',
],
MaxNumberOfMessages=1,
MessageAttributeNames=[
'All',
],
VisibilityTimeout=5,
WaitTimeSeconds=0,
)
for m in resp['Messages']:
on_message(m)
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment