Created
February 11, 2019 20:02
-
-
Save robcowie/792f6731f4b94b78fd680d0d0a9eee17 to your computer and use it in GitHub Desktop.
EC2 Airflow Operator
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
""" | |
NOTE THIS IS UNTESTED AS IT WAS NOT REQUIRED. | |
See: | |
- https://github.com/apache/airflow/blob/master/airflow/contrib/hooks/aws_hook.py | |
- https://github.com/apache/airflow/blob/master/airflow/contrib/operators/ecs_operator.py | |
- https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/ec2.html#EC2.ServiceResource.create_instances | |
- https://stackabuse.com/automating-aws-ec2-management-with-python-and-boto3/ | |
- https://devops.stackexchange.com/questions/3264/how-to-run-a-script-or-a-command-on-ec2-instance-via-aws-cli?rq=1 | |
We have granted AmazonSSMFullAccess and EMRAllActions to IAM user CloudComposer.pipeline.platform | |
""" | |
from airflow.contrib.hooks.aws_hook import AwsHook | |
from airflow.exceptions import AirflowException | |
from airflow.models import BaseOperator | |
from airflow.plugins_manager import AirflowPlugin | |
from airflow.utils import apply_defaults | |
DEFAULT_INSTANCE_CONFIG = { | |
'BlockDeviceMappings': [ | |
{ | |
'DeviceName': '/dev/xvda', | |
'Ebs': { | |
'VolumeSize': 123, | |
'VolumeType': 'gp2', | |
'DeleteOnTermination': True | |
} | |
}, | |
], | |
'ImageId': 'ami-04681a1dbd79675a5', | |
'InstanceType': 'm1.medium', | |
'MaxCount': 1, | |
'MinCount': 1, | |
'Monitoring': { | |
'Enabled': False | |
} | |
} | |
class CreateEC2Operator(BaseOperator): | |
ui_color = '#f0ede4' | |
client = None | |
template_fields = ('instance_config', 'user_data') | |
@apply_defaults | |
def __init__(self, instance_config, aws_conn_id=None, region_name=None, user_data=None, **kwargs): | |
super(CreateEC2Operator, self).__init__(**kwargs) | |
self.aws_conn_id = aws_conn_id | |
self.region_name = region_name | |
self.instance_config = instance_config | |
self.user_data = user_data | |
self.hook = self.get_hook() | |
def execute(self, context): | |
self.log.info('Starting EC2 instance') | |
conn = self.hook.get_connection(self.aws_conn_id) | |
self.resource = self.hook.get_resource_type('ec2', region_name=self.region_name) | |
if self.user_data: | |
self.instance_config['UserData'] = self.user_data | |
self.log.info(self.instance_config) | |
response = self.resource.create_instances(**self.instance_config) | |
# TODO: Return instance id (xcom) | |
# TODO: Give instance a name | |
# TODO: Give Airflow IAM user SSM access perms | |
def get_hook(self): | |
return AwsHook(aws_conn_id=self.aws_conn_id) | |
class EC2Plugin(AirflowPlugin): | |
name = 'ec2' | |
operators = [CreateEC2Operator] |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment