Skip to content

Instantly share code, notes, and snippets.

@robcowie
Created February 11, 2019 20:02
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save robcowie/792f6731f4b94b78fd680d0d0a9eee17 to your computer and use it in GitHub Desktop.
Save robcowie/792f6731f4b94b78fd680d0d0a9eee17 to your computer and use it in GitHub Desktop.
EC2 Airflow Operator
# -*- coding: utf-8 -*-
"""
NOTE THIS IS UNTESTED AS IT WAS NOT REQUIRED.
See:
- https://github.com/apache/airflow/blob/master/airflow/contrib/hooks/aws_hook.py
- https://github.com/apache/airflow/blob/master/airflow/contrib/operators/ecs_operator.py
- https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/ec2.html#EC2.ServiceResource.create_instances
- https://stackabuse.com/automating-aws-ec2-management-with-python-and-boto3/
- https://devops.stackexchange.com/questions/3264/how-to-run-a-script-or-a-command-on-ec2-instance-via-aws-cli?rq=1
We have granted AmazonSSMFullAccess and EMRAllActions to IAM user CloudComposer.pipeline.platform
"""
from airflow.contrib.hooks.aws_hook import AwsHook
from airflow.exceptions import AirflowException
from airflow.models import BaseOperator
from airflow.plugins_manager import AirflowPlugin
from airflow.utils import apply_defaults
DEFAULT_INSTANCE_CONFIG = {
'BlockDeviceMappings': [
{
'DeviceName': '/dev/xvda',
'Ebs': {
'VolumeSize': 123,
'VolumeType': 'gp2',
'DeleteOnTermination': True
}
},
],
'ImageId': 'ami-04681a1dbd79675a5',
'InstanceType': 'm1.medium',
'MaxCount': 1,
'MinCount': 1,
'Monitoring': {
'Enabled': False
}
}
class CreateEC2Operator(BaseOperator):
ui_color = '#f0ede4'
client = None
template_fields = ('instance_config', 'user_data')
@apply_defaults
def __init__(self, instance_config, aws_conn_id=None, region_name=None, user_data=None, **kwargs):
super(CreateEC2Operator, self).__init__(**kwargs)
self.aws_conn_id = aws_conn_id
self.region_name = region_name
self.instance_config = instance_config
self.user_data = user_data
self.hook = self.get_hook()
def execute(self, context):
self.log.info('Starting EC2 instance')
conn = self.hook.get_connection(self.aws_conn_id)
self.resource = self.hook.get_resource_type('ec2', region_name=self.region_name)
if self.user_data:
self.instance_config['UserData'] = self.user_data
self.log.info(self.instance_config)
response = self.resource.create_instances(**self.instance_config)
# TODO: Return instance id (xcom)
# TODO: Give instance a name
# TODO: Give Airflow IAM user SSM access perms
def get_hook(self):
return AwsHook(aws_conn_id=self.aws_conn_id)
class EC2Plugin(AirflowPlugin):
name = 'ec2'
operators = [CreateEC2Operator]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment