Skip to content

Instantly share code, notes, and snippets.

@atharvai
Last active November 28, 2018 23:18
Show Gist options
  • Save atharvai/84d359cc7f20b6dc72724a1c3d7eaa94 to your computer and use it in GitHub Desktop.
Save atharvai/84d359cc7f20b6dc72724a1c3d7eaa94 to your computer and use it in GitHub Desktop.
Description: >
This template will deploy an EMR cluster and the necessary security groups. A bastion instance will also be
provisioned to allow SSH access. A Sagemaker notebook instance will also be configured with access to EMR.
Parameters:
EnvironmentName:
Description: An environment name that will be prefixed to resource names
Type: String
MinLength: 1
MaxLength: 41
AllowedPattern: ^[a-zA-Z0-9-]*$
Default: 'sparkml-sagemaker-workshop'
VpcCIDR:
Description: Please enter the IP range (CIDR notation) for the VPC to be created
Type: String
Default: 10.192.0.0/16
PublicSubnetCIDR:
Description: Please enter the IP range (CIDR notation) for the public subnet
Type: String
Default: 10.192.10.0/24
PrivateSubnetCIDR:
Description: Please enter the IP range (CIDR notation) for the private subnet in the first Availability Zone
Type: String
Default: 10.192.20.0/24
KeyName:
Description: SSH key (for access to bastion and EMR instances)
Type: AWS::EC2::KeyPair::KeyName
Default: qwiklabskey
S3Bucket:
Description: S3 bucket where the Cloudformation/Scripts/config files are placed
Type: String
Default: sparkml-sagemaker-workshop
LambdaKinesisEventProcessorS3Key:
Type : String
Default : functions
Description : Name of S3 key for Zip with Stream Processing Kinesis Event Processor Lambda function package.
# OutputBucket:
# Description: Enter a unique name for S3 bucket hold your working data
# Type: String
Resources:
VpcStack:
Type: AWS::CloudFormation::Stack
Properties:
TemplateURL: !Join ['', ['https://s3.amazonaws.com/', !Ref 'S3Bucket', '-', !Ref 'AWS::Region', '/cloudformation/vpc.template']]
TimeoutInMinutes: '60'
Parameters:
EnvironmentName: !Ref EnvironmentName
VpcCIDR: !Ref VpcCIDR
PublicSubnetCIDR: !Ref PublicSubnetCIDR
PrivateSubnetCIDR: !Ref PrivateSubnetCIDR
KeyName: !Ref KeyName
# OutputBucket: !Ref OutputBucket
EmrSagemakerStack:
Type: AWS::CloudFormation::Stack
DependsOn: VpcStack
Properties:
TemplateURL: !Join ['', ['https://s3.amazonaws.com/', !Ref 'S3Bucket', '-', !Ref 'AWS::Region', '/cloudformation/emr_sagemaker.template']]
TimeoutInMinutes: '60'
Parameters:
EnvironmentName: !Ref EnvironmentName
KeyName: !Ref KeyName
Vpc: !GetAtt VpcStack.Outputs.VPC
PublicSubnet: !GetAtt VpcStack.Outputs.PublicSubnet
S3Bucket: !Ref S3Bucket
S3OutputBucket: !GetAtt VpcStack.Outputs.OutputS3Bucket
# GlueStack:
# Type: AWS::CloudFormation::Stack
# DependsOn: VpcStack
# Properties:
# TemplateURL: !Join ['', ['https://s3.amazonaws.com/', !Ref 'S3Bucket', '-', !Ref 'AWS::Region', '/cloudformation/glue.template']]
# TimeoutInMinutes: '60'
# Parameters:
# EnvironmentName: !Ref EnvironmentName
StreamInfrastructure:
Type: AWS::CloudFormation::Stack
DependsOn: VpcStack
Properties:
TemplateURL: !Join ['', ['https://s3.amazonaws.com/', !Ref 'S3Bucket', '-', !Ref 'AWS::Region', '/cloudformation/stream_infrastructure.template']]
TimeoutInMinutes: '60'
Parameters:
LambdaS3Bucket: !Join ['', [!Ref 'S3Bucket', '-', !Ref 'AWS::Region']]
LambdaKinesisEventProcessorS3Key: !Ref LambdaKinesisEventProcessorS3Key
S3OutputBucket: !GetAtt VpcStack.Outputs.OutputS3Bucket
Outputs:
BastionIp:
Description: IP address of the bastion instance
Value: !GetAtt VpcStack.Outputs.BastionIp
EmrMasterNodeDns:
Description: Public DNS name of the master EMR instance
Value: !GetAtt EmrSagemakerStack.Outputs.EmrMasterNodeDns
SagemakerRoleArn:
Description: IAM role name
Value: !GetAtt EmrSagemakerStack.Outputs.SagemakerRoleArn
SagemakerNotebookInstanceName:
Description: Name of the sagemaker notebook
Value: !GetAtt EmrSagemakerStack.Outputs.SagemakerNotebookInstanceName
Description: >
This template will deploy an EMR cluster and the necessary security groups. A bastion instance will also be
provisioned to allow SSH access. A Sagemaker notebook instance will also be configured with access to EMR.
Parameters:
EnvironmentName:
Description: An environment name that will be prefixed to resource names
Type: String
KeyName:
Description: SSH key (for access to bastion and EMR instances)
Type: AWS::EC2::KeyPair::KeyName
Default: qwiklabskey
Vpc:
Description: VPC Id
Type: String
Default: ''
PublicSubnet:
Description: Public Subnet ID for Bastion instance
Type: String
Default: ''
S3Bucket:
Description: S3 bucket where the Cloudformation/Scripts/config files are placed
Type: String
Default: sparkml-sagemaker-workshop
S3OutputBucket:
Description: S3 output bucket created using cloudformation
Type: String
NotebookInstanceType:
Description: Notebook instance type
Type: String
Default: ml.m4.2xlarge
Conditions:
VPCParamExists: !Not [ !Equals [!Ref Vpc, '']]
PublicSubnetParamExists: !Not [ !Equals [!Ref PublicSubnet, '']]
Resources:
EmrCluster:
Type: "AWS::EMR::Cluster"
Properties:
Applications:
- Name: Presto
- Name: Spark
- Name: Hive
- Name: Livy
BootstrapActions:
- Name: Install MLeap as EMR bootstap script
ScriptBootstrapAction:
Path: !Sub s3://${S3Bucket}-${AWS::Region}/cloudformation/scripts/emr-bootstrap.sh
Configurations:
- Classification: spark
ConfigurationProperties:
"maximizeResourceAllocation": "true"
- Classification: spark-defaults
ConfigurationProperties:
"spark.driver.extraClassPath": "/home/hadoop/javalib/*:/usr/lib/hadoop-lzo/lib/*:/usr/lib/hadoop/hadoop-aws.jar:/usr/share/aws/aws-java-sdk/*:/usr/share/aws/emr/emrfs/conf:/usr/share/aws/emr/emrfs/lib/*:/usr/share/aws/emr/emrfs/auxlib/*:/usr/share/aws/emr/security/conf:/usr/share/aws/emr/security/lib/*:/usr/share/aws/hmclient/lib/aws-glue-datacatalog-spark-client.jar:/usr/share/java/Hive-JSON-Serde/hive-openx-serde.jar:/usr/share/aws/sagemaker-spark-sdk/lib/sagemaker-spark-sdk.jar"
- Classification: hive-site
ConfigurationProperties:
"hive.metastore.client.factory.class": "com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory"
- Classification: presto-connector-hive
ConfigurationProperties:
"hive.metastore.glue.datacatalog.enabled": "true"
- Classification: spark-hive-site
ConfigurationProperties:
"hive.metastore.client.factory.class": "com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory"
EbsRootVolumeSize: 10
Instances:
# EmrManagedMasterSecurityGroup: !Ref EmrMasterSecurityGroup
# EmrManagedSlaveSecurityGroup: !Ref EmrSlaveSecurityGroup
AdditionalMasterSecurityGroups:
- !Ref EmrAdditionalSecurityGroup
AdditionalSlaveSecurityGroups:
- !Ref EmrAdditionalSecurityGroup
Ec2KeyName: !Ref KeyName
Ec2SubnetId: !If [PublicSubnetParamExists, !Ref PublicSubnet, !ImportValue 'SparkSagemaker-PublicSubnet']
MasterInstanceGroup:
EbsConfiguration:
EbsBlockDeviceConfigs:
- VolumeSpecification:
SizeInGB: 32
VolumeType: gp2
InstanceCount: 1
InstanceType: m4.2xlarge
Market: ON_DEMAND
Name: Master instance group
CoreInstanceGroup:
EbsConfiguration:
EbsBlockDeviceConfigs:
- VolumeSpecification:
SizeInGB: 32
VolumeType: gp2
InstanceCount: 4
InstanceType: m4.2xlarge
Market: ON_DEMAND
Name: Core instance group
JobFlowRole: !Ref EmrInstanceProfile
#LogUri: !Join [ '', [ 's3://', !ImportValue 'SparkSagemaker-OutputS3Bucket', '/emrlogs/' ] ]
Name: !Sub ${EnvironmentName} EMR Cluster
ReleaseLabel: emr-5.12.2
ScaleDownBehavior: TERMINATE_AT_TASK_COMPLETION
ServiceRole: !Ref EmrServiceRole
Tags:
- Key: Name
Value: !Sub ${EnvironmentName} EMR Cluster
VisibleToAllUsers: true
UpdateLivy:
Properties:
ActionOnFailure: CONTINUE
HadoopJarStep:
Args:
- !Sub s3://${S3Bucket}-${AWS::Region}/cloudformation/scripts/bootstrap-livy.sh
Jar: s3://${AWS::Region}.elasticmapreduce/libs/script-runner/script-runner.jar
MainClass: ''
JobFlowId: !Ref 'EmrCluster'
Name: UpdateLivy
Type: AWS::EMR::Step
EmrIamRole:
Type: "AWS::IAM::Role"
Properties:
AssumeRolePolicyDocument:
Version: "2012-10-17"
Statement:
-
Effect: "Allow"
Principal:
Service:
- "ec2.amazonaws.com"
Action:
- "sts:AssumeRole"
ManagedPolicyArns:
- "arn:aws:iam::aws:policy/service-role/AmazonElasticMapReduceforEC2Role"
- "arn:aws:iam::aws:policy/AmazonSageMakerFullAccess"
- "arn:aws:iam::aws:policy/AmazonKinesisFullAccess"
- "arn:aws:iam::aws:policy/AmazonKinesisFirehoseFullAccess"
EmrServiceRole:
Type: "AWS::IAM::Role"
Properties:
AssumeRolePolicyDocument:
Version: "2012-10-17"
Statement:
-
Effect: "Allow"
Principal:
Service:
- "elasticmapreduce.amazonaws.com"
Action:
- "sts:AssumeRole"
ManagedPolicyArns:
- "arn:aws:iam::aws:policy/service-role/AmazonElasticMapReduceRole"
EmrInstanceProfile:
Type: "AWS::IAM::InstanceProfile"
Properties:
Roles:
- !Ref EmrIamRole
EmrAdditionalSecurityGroup:
Type: "AWS::EC2::SecurityGroup"
DependsOn: SagemakerSecurityGroup
Properties:
GroupName: EmrMasterSecurityGroup
GroupDescription: Allow SSH and Sagemaker access
SecurityGroupIngress:
- SourceSecurityGroupId: !ImportValue 'SparkSagemaker-BastionAccessSecurityGroup'
Description: SSH
IpProtocol: tcp
FromPort: 22
ToPort: 22
- SourceSecurityGroupId: !Ref SagemakerSecurityGroup
Description: Sagemaker
IpProtocol: tcp
FromPort: 8998
ToPort: 8998
- SourceSecurityGroupId: !Ref SagemakerSecurityGroup
Description: EMR resource manager
IpProtocol: tcp
FromPort: 8088
ToPort: 8088
- SourceSecurityGroupId: !Ref SagemakerSecurityGroup
Description: EMR Spark UI Server
IpProtocol: tcp
FromPort: 18080
ToPort: 18080
Tags:
- Key: Name
Value: !Sub ${EnvironmentName} EMR Additional Security Group
VpcId: !If [VPCParamExists, !Ref Vpc, !ImportValue 'SparkSagemaker-VPC']
# EmrSlaveSecurityGroup:
# Type: "AWS::EC2::SecurityGroup"
# Properties:
# GroupName: EmrSlaveSecurityGroup
# GroupDescription: Allow SSH and Sagemaker access
# SecurityGroupIngress:
# - SourceSecurityGroupId: !ImportValue 'SparkSagemaker-BastionAccessSecurityGroup'
# Description: SSH
# IpProtocol: tcp
# FromPort: 22
# ToPort: 22
# - SourceSecurityGroupId: !Ref SagemakerSecurityGroup
# Description: Sagemaker
# IpProtocol: tcp
# FromPort: 8998
# ToPort: 8998
# Tags:
# - Key: Name
# Value: !Sub ${EnvironmentName} EMR Additional Security Group
# VpcId: !If [VPCParamExists, !Ref Vpc, !ImportValue 'SparkSagemaker-VPC']
SagemakerRole:
Type: "AWS::IAM::Role"
Properties:
AssumeRolePolicyDocument:
Version: "2012-10-17"
Statement:
-
Effect: "Allow"
Principal:
Service:
- "sagemaker.amazonaws.com"
Action:
- "sts:AssumeRole"
ManagedPolicyArns:
- "arn:aws:iam::aws:policy/AmazonSageMakerFullAccess"
- "arn:aws:iam::aws:policy/AmazonKinesisFullAccess"
- 'arn:aws:iam::aws:policy/AWSLambdaFullAccess'
- 'arn:aws:iam::aws:policy/AmazonEC2ContainerRegistryFullAccess'
- "arn:aws:iam::aws:policy/AmazonKinesisFirehoseFullAccess"
- "arn:aws:iam::aws:policy/AmazonAthenaFullAccess"
SagemakerLifecycleConfig:
Type: "AWS::SageMaker::NotebookInstanceLifecycleConfig"
Properties:
NotebookInstanceLifecycleConfigName: !Sub ${EnvironmentName}LifecycleConfig
OnStart:
- Content:
Fn::Base64: !Sub |
# Fix issue with Pandas, see https://github.com/jupyter-incubator/sparkmagic/issues/458
pip install pandas==0.22.0
curl https://bintray.com/sbt/rpm/rpm | sudo tee /etc/yum.repos.d/bintray-sbt-rpm.repo
sudo yum -y install sbt
mkdir -p /home/ec2-user/container
aws s3 cp s3://${S3Bucket}-${AWS::Region}/container/ /home/ec2-user/container/ --recursive
sudo chown -R ec2-user:ec2-user /home/ec2-user/container/
sudo chmod +x /home/ec2-user/container/build_and_push.sh
# Copy sample notebooks
cd /home/ec2-user/SageMaker
aws s3 cp s3://${S3Bucket}-${AWS::Region}/spark-sagemaker-notebooks/schema_values.txt .
aws s3 cp s3://${S3Bucket}-${AWS::Region}/spark-sagemaker-notebooks/1_Predict_Airline_Delays_Using_SageMaker_Spark.ipynb .
aws s3 cp s3://${S3Bucket}-${AWS::Region}/spark-sagemaker-notebooks/2_1_Predict_Airline_Delays_Using_Spark.ipynb .
aws s3 cp s3://${S3Bucket}-${AWS::Region}/spark-sagemaker-notebooks/2_2_Sagemaker_Model_Deployment.ipynb .
aws s3 cp s3://${S3Bucket}-${AWS::Region}/spark-sagemaker-notebooks/3_Flight-Data-Event-Generation.ipynb .
aws s3 cp s3://${S3Bucket}-${AWS::Region}/spark-sagemaker-notebooks/4_Run_Athena_Queries.ipynb .
aws s3 cp s3://${S3Bucket}-${AWS::Region}/spark-sagemaker-notebooks/Optional_1_Deploy_Existing_Airline_Delay_Prediction_Model.ipynb .
aws s3 cp s3://${S3Bucket}-${AWS::Region}/spark-sagemaker-notebooks/Optional_1_Cleanup_SageMaker_Objects.ipynb .
#aws s3 cp s3://${S3Bucket}-${AWS::Region}/spark-sagemaker-notebooks/linear_learner_regressor_with_controlled_feature_dimension.ipynb .
#aws s3 cp s3://${S3Bucket}-${AWS::Region}/spark-sagemaker-notebooks/linear_learner_regressor_with_dynamic_feature_dimension.ipynb .
#aws s3 cp s3://${S3Bucket}-${AWS::Region}/spark-sagemaker-notebooks/prepare_imdb_movie_dataset.ipynb .
#aws s3 cp s3://${S3Bucket}-${AWS::Region}/spark-sagemaker-notebooks/validating_predictions_across_linear_learner_kmeans_models.ipynb .
#aws s3 cp s3://${S3Bucket}-${AWS::Region}/spark-sagemaker-notebooks/random_cut_forest_nytaxi.ipynb .
#aws s3 cp s3://${S3Bucket}-${AWS::Region}/spark-sagemaker-notebooks/random_cut_forest_temprature.ipynb .
sed -i.bak 's/#s3_data_bucket/${S3Bucket}-${AWS::Region}/g' *.ipynb
sed -i.bak 's/#region/${AWS::Region}/g' *.ipynb
sed -i.bak 's/#s3_output_bucket/${S3OutputBucket}/g' *.ipynb
sed -i.bak 's/#iam_role/arn:aws:iam::${AWS::AccountId}:role\/${SagemakerRole}/g' *.ipynb
rm *.bak
mkdir -p img
#aws s3 cp s3://${S3Bucket}-${AWS::Region}/spark-sagemaker-notebooks/img/ img/ --recursive
#wget https://${S3Bucket}-${AWS::Region}.s3.amazonaws.com/spark-sagemaker-notebooks/k_means_clustering_with_controlled_feature_dimension.ipynb
#wget https://${S3Bucket}-${AWS::Region}.s3.amazonaws.com/spark-sagemaker-notebooks/linear_learner_regressor_with_controlled_feature_dimension.ipynb
#wget https://${S3Bucket}-${AWS::Region}.s3.amazonaws.com/spark-sagemaker-notebooks/linear_learner_regressor_with_dynamic_feature_dimension.ipynb
#wget https://${S3Bucket}-${AWS::Region}.s3.amazonaws.com/spark-sagemaker-notebooks/prepare_imdb_movie_dataset.ipynb
#wget https://${S3Bucket}-${AWS::Region}.s3.amazonaws.com/spark-sagemaker-notebooks/validating_predictions_across_linear_learner_kmeans_models.ipynb
chown ec2-user:ec2-user *
# Update config file with DNS name of EMR cluster
cat >/home/ec2-user/.sparkmagic/config.json <<EOL
{
"kernel_python_credentials" : {
"username": "",
"password": "",
"url": "http://${EmrCluster.MasterPublicDNS}:8998",
"auth": "None"
},
"kernel_scala_credentials" : {
"username": "",
"password": "",
"url": "http://${EmrCluster.MasterPublicDNS}:8998",
"auth": "None"
},
"kernel_r_credentials": {
"username": "",
"password": "",
"url": "http://${EmrCluster.MasterPublicDNS}:8998"
},
"logging_config": {
"version": 1,
"formatters": {
"magicsFormatter": {
"format": "%(asctime)s\t%(levelname)s\t%(message)s",
"datefmt": ""
}
},
"handlers": {
"magicsHandler": {
"class": "hdijupyterutils.filehandler.MagicsFileHandler",
"formatter": "magicsFormatter",
"home_path": "~/.sparkmagic"
}
},
"loggers": {
"magicsLogger": {
"handlers": ["magicsHandler"],
"level": "DEBUG",
"propagate": 0
}
}
},
"wait_for_idle_timeout_seconds": 15,
"livy_session_startup_timeout_seconds": 60,
"fatal_error_suggestion": "The code failed because of a fatal error:\n\t{}.\n\nSome things to try:\na) Make sure Spark has enough available resources for Jupyter to create a Spark context.\nb) Contact your Jupyter administrator to make sure the Spark magics library is configured correctly.\nc) Restart the kernel.",
"ignore_ssl_errors": false,
"session_configs": {
"driverMemory": "1000M",
"executorCores": 16,
"executorMemory":"10G"
},
"use_auto_viz": true,
"coerce_dataframe": true,
"max_results_sql": 2500,
"pyspark_dataframe_encoding": "utf-8",
"heartbeat_refresh_seconds": 30,
"livy_server_heartbeat_timeout_seconds": 0,
"heartbeat_retry_seconds": 10,
"server_extension_default_kernel_name": "pysparkkernel",
"custom_headers": {},
"retry_policy": "configurable",
"retry_seconds_to_sleep_list": [0.2, 0.5, 1, 3, 5],
"configurable_retry_policy_max_retries": 8
}
EOL
SagemakerNotebookInstance:
Type: "AWS::SageMaker::NotebookInstance"
DependsOn:
- SagemakerLifecycleConfig
- SagemakerSecurityGroup
Properties:
DirectInternetAccess: Enabled
SubnetId: !If [PublicSubnetParamExists, !Ref PublicSubnet, !ImportValue 'SparkSagemaker-PublicSubnet']
NotebookInstanceName: !Sub ${EnvironmentName}Notebook
InstanceType: !Ref NotebookInstanceType
LifecycleConfigName: !GetAtt SagemakerLifecycleConfig.NotebookInstanceLifecycleConfigName
RoleArn: !GetAtt SagemakerRole.Arn
SecurityGroupIds:
- !Ref SagemakerSecurityGroup
Tags:
- Key: Name
Value: !Sub ${EnvironmentName} Sagemaker Notebook
SagemakerSecurityGroup:
Type: "AWS::EC2::SecurityGroup"
Properties:
GroupName: Spark-.SagemakerSecurityGroup
GroupDescription: Security group to control access to Sagemaker
Tags:
- Key: Name
Value: !Sub ${EnvironmentName} Sagemaker Security Group
VpcId: !If [VPCParamExists, !Ref Vpc, !ImportValue 'SparkSagemaker-VPC']
Outputs:
EmrMasterNodeDns:
Description: Public DNS name of the master EMR instance
Value: !GetAtt EmrCluster.MasterPublicDNS
SagemakerRoleArn:
Description: IAM role name
Value: !GetAtt SagemakerRole.Arn
SagemakerNotebookInstanceName:
Description: Name of the sagemaker notebook
Value: !GetAtt SagemakerNotebookInstance.NotebookInstanceName
AWSTemplateFormatVersion: '2010-09-09'
Description: "Template to set up Kinesis stream, Firehose, Lambda functions,related IAM roles for AWS Lambda Real-time Stream Processing"
Parameters:
LambdaS3Bucket:
Type: String
Default: sparkml-sagemaker-workshop-us-east-1
Description: Name of S3 bucket where Lambda function packages are stored.
LambdaKinesisEventProcessorS3Key:
Type : String
Default : functions
Description : Name of S3 key for Zip with Stream Processing Kinesis Event Processor Lambda function package.
LambdaKinesisEventProcessorEndpoint1FunctionName:
Type : String
Default : InvokeA1SageMakerEndpoint
Description : Name of Lambda function for Stream Processing Kinesis Event Processor Lambda function for Endpoint1
LambdaKinesisEventProcessorEndpoint2FunctionName:
Type : String
Default : InvokeA2SageMakerEndpoint
Description : Name of Lambda function for Stream Processing Kinesis Event Processor Lambda function for Endpoint2
S3OutputBucket:
Description: S3 output bucket created using cloudformation
Type: String
Resources:
EventStream:
Type: 'AWS::Kinesis::Stream'
Properties:
Name: 'flight_data_events'
ShardCount: 1
KinesisEventProcessorEndpoint1:
Type: 'AWS::Lambda::Function'
DependsOn:
- KinesisFirehoseDeliveryStream
Properties:
Description: Stream Processing DDB Event Processor Endpoint1
FunctionName: !Ref LambdaKinesisEventProcessorEndpoint1FunctionName
Handler:
Fn::Join:
- ""
-
- !Ref LambdaKinesisEventProcessorEndpoint1FunctionName
- ".lambda_handler"
MemorySize: 128
Role: !GetAtt
- EventProcessorExecutionRole
- Arn
Timeout: 10
Runtime: python2.7
Code:
S3Bucket: !Ref LambdaS3Bucket
S3Key:
Fn::Join:
- ""
-
- !Ref LambdaKinesisEventProcessorS3Key
- "/"
- !Ref LambdaKinesisEventProcessorEndpoint1FunctionName
- "/"
- !Ref LambdaKinesisEventProcessorEndpoint1FunctionName
- ".zip"
Environment:
Variables:
KINESIS_FIREHOSE_DELIVERY_STREAM: !Ref 'KinesisFirehoseDeliveryStream'
KinesisEventProcessorEndpoint2:
Type: 'AWS::Lambda::Function'
DependsOn:
- KinesisFirehoseDeliveryStream
Properties:
Description: Stream Processing DDB Event Processor Endpoint2
FunctionName: !Ref LambdaKinesisEventProcessorEndpoint2FunctionName
Handler:
Fn::Join:
- ""
-
- !Ref LambdaKinesisEventProcessorEndpoint2FunctionName
- ".lambda_handler"
MemorySize: 128
Role: !GetAtt
- EventProcessorExecutionRole
- Arn
Timeout: 10
Runtime: python2.7
Code:
S3Bucket: !Ref LambdaS3Bucket
S3Key:
Fn::Join:
- ""
-
- !Ref LambdaKinesisEventProcessorS3Key
- "/"
- !Ref LambdaKinesisEventProcessorEndpoint2FunctionName
- "/"
- !Ref LambdaKinesisEventProcessorEndpoint2FunctionName
- ".zip"
Environment:
Variables:
KINESIS_FIREHOSE_DELIVERY_STREAM: !Ref 'KinesisFirehoseDeliveryStream'
SAGEMAKER_A2_ENDPOINT_NAME: 'flightdelays-inference-server-0001'
EventSourceMappingEndpoint1:
Type: AWS::Lambda::EventSourceMapping
DependsOn:
- KinesisEventProcessorEndpoint1
Properties:
EventSourceArn:
Fn::Join:
- ""
-
- "arn:aws:kinesis:"
- Ref: "AWS::Region"
- ":"
- Ref: "AWS::AccountId"
- ":stream/"
- Ref: "EventStream"
FunctionName: !GetAtt 'KinesisEventProcessorEndpoint1.Arn'
StartingPosition: "TRIM_HORIZON"
BatchSize: 1
EventSourceMappingEndpoint2:
Type: AWS::Lambda::EventSourceMapping
DependsOn:
- KinesisEventProcessorEndpoint2
Properties:
EventSourceArn:
Fn::Join:
- ""
-
- "arn:aws:kinesis:"
- Ref: "AWS::Region"
- ":"
- Ref: "AWS::AccountId"
- ":stream/"
- Ref: "EventStream"
FunctionName: !GetAtt 'KinesisEventProcessorEndpoint2.Arn'
StartingPosition: "TRIM_HORIZON"
BatchSize: 1
EventProcessorExecutionRole:
Type: 'AWS::IAM::Role'
Properties:
AssumeRolePolicyDocument:
Version: 2012-10-17
Statement:
- Effect: Allow
Principal:
Service:
- lambda.amazonaws.com
Action:
- 'sts:AssumeRole'
Path: /
Policies:
- PolicyName: EventProcessorExecutionPolicy
PolicyDocument:
Version: 2012-10-17
Statement:
- Effect: Allow
Action:
- 'logs:*'
Resource: 'arn:aws:logs:*:*:*'
ManagedPolicyArns:
- 'arn:aws:iam::aws:policy/service-role/AWSLambdaKinesisExecutionRole'
- "arn:aws:iam::aws:policy/AmazonKinesisFullAccess"
- "arn:aws:iam::aws:policy/AmazonKinesisFirehoseFullAccess"
- "arn:aws:iam::aws:policy/AmazonSageMakerFullAccess"
- "arn:aws:iam::aws:policy/AmazonAthenaFullAccess"
KinesisFirehoseDeliveryStream:
Type: AWS::KinesisFirehose::DeliveryStream
Properties:
DeliveryStreamName: 'EventStreamFirehose'
DeliveryStreamType: DirectPut
S3DestinationConfiguration:
BucketARN: !Join
- ''
- - 'arn:aws:s3:::'
- !Ref S3OutputBucket
Prefix: kinesis-fh/data/
BufferingHints:
IntervalInSeconds: 60
SizeInMBs: 100
CloudWatchLoggingOptions:
Enabled: 'false'
CompressionFormat: GZIP
EncryptionConfiguration:
NoEncryptionConfig: NoEncryption
RoleARN: !GetAtt 'FirehoseDeliveryIAMRole.Arn'
DependsOn:
- FirehoseDeliveryIAMPolicy
FirehoseDeliveryIAMRole:
Type: AWS::IAM::Role
Properties:
AssumeRolePolicyDocument:
Version: '2012-10-17'
Statement:
- Sid: ''
Effect: Allow
Principal:
Service: firehose.amazonaws.com
Action: sts:AssumeRole
FirehoseDeliveryIAMPolicy:
Type: AWS::IAM::Policy
Properties:
PolicyName: 'EventStreamFirehoseIAMPolicy'
PolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Action:
- s3:AbortMultipartUpload
- s3:GetBucketLocation
- s3:GetObject
- s3:ListBucket
- s3:ListBucketMultipartUploads
- s3:PutObject
Resource:
- !Join
- ''
- - 'arn:aws:s3:::'
- !Ref S3OutputBucket
- !Join
- ''
- - 'arn:aws:s3:::'
- !Ref S3OutputBucket
- '/*'
Roles:
- !Ref 'FirehoseDeliveryIAMRole'
Outputs:
KinesisStream:
Value: !Ref EventStream
Description: The Kinesis stream used for ingestion.
KinesisFirehoseDeliveryStream:
Value: !Ref KinesisFirehoseDeliveryStream
Description: The Kinesis firehose used for ingestion.
Region:
Value: !Ref 'AWS::Region'
Description: The region this template was launched in.
Description: >
This template deploys a VPC, a public subnet with an Internet Gateway, a private subnet with a NAT gateway
for Internet access, and default routes.
Parameters:
EnvironmentName:
Description: An environment name that will be prefixed to resource names
Type: String
VpcCIDR:
Description: Please enter the IP range (CIDR notation) for this VPC
Type: String
Default: 10.192.0.0/16
PublicSubnetCIDR:
Description: Please enter the IP range (CIDR notation) for the public subnet
Type: String
Default: 10.192.10.0/24
PrivateSubnetCIDR:
Description: Please enter the IP range (CIDR notation) for the private subnet in the first Availability Zone
Type: String
Default: 10.192.20.0/24
KeyName:
Description: SSH key (for access to bastion and EMR instances)
Type: AWS::EC2::KeyPair::KeyName
Default: qwiklabskey
# OutputBucket:
# Description: Enter a unique name for S3 bucket to hold EMR cluster logs
# Type: String
Mappings:
Region2AMI:
ap-south-1:
AmazonLinux: ami-531a4c3c
eu-west-3:
AmazonLinux: ami-8ee056f3
eu-west-2:
AmazonLinux: ami-403e2524
eu-west-1:
AmazonLinux: ami-d834aba1
ap-northeast-3:
AmazonLinux: ami-83444afe
ap-northeast-2:
AmazonLinux: ami-863090e8
ap-northeast-1:
AmazonLinux: ami-ceafcba8
sa-east-1:
AmazonLinux: ami-84175ae8
ca-central-1:
AmazonLinux: ami-a954d1cd
ap-southeast-1:
AmazonLinux: ami-68097514
ap-southeast-2:
AmazonLinux: ami-942dd1f6
eu-central-1:
AmazonLinux: ami-5652ce39
us-east-1:
AmazonLinux: ami-97785bed
us-east-2:
AmazonLinux: ami-f63b1193
us-west-1:
AmazonLinux: ami-824c4ee2
us-west-2:
AmazonLinux: ami-f2d3638a
Resources:
VPC:
Type: AWS::EC2::VPC
Properties:
CidrBlock: !Ref VpcCIDR
EnableDnsHostnames: true
Tags:
- Key: Name
Value: !Ref EnvironmentName
InternetGateway:
Type: AWS::EC2::InternetGateway
Properties:
Tags:
- Key: Name
Value: !Ref EnvironmentName
InternetGatewayAttachment:
Type: AWS::EC2::VPCGatewayAttachment
Properties:
InternetGatewayId: !Ref InternetGateway
VpcId: !Ref VPC
PublicSubnet:
Type: AWS::EC2::Subnet
Properties:
VpcId: !Ref VPC
AvailabilityZone: !Select [ 0, !GetAZs '' ]
CidrBlock: !Ref PublicSubnetCIDR
MapPublicIpOnLaunch: true
Tags:
- Key: Name
Value: !Sub ${EnvironmentName} Public Subnet
PrivateSubnet:
Type: AWS::EC2::Subnet
Properties:
VpcId: !Ref VPC
AvailabilityZone: !Select [ 0, !GetAZs '' ]
CidrBlock: !Ref PrivateSubnetCIDR
MapPublicIpOnLaunch: false
Tags:
- Key: Name
Value: !Sub ${EnvironmentName} Private Subnet
NatGatewayEIP:
Type: AWS::EC2::EIP
DependsOn: InternetGatewayAttachment
Properties:
Domain: vpc
NatGateway:
Type: AWS::EC2::NatGateway
Properties:
AllocationId: !GetAtt NatGatewayEIP.AllocationId
SubnetId: !Ref PublicSubnet
PublicRouteTable:
Type: AWS::EC2::RouteTable
Properties:
VpcId: !Ref VPC
Tags:
- Key: Name
Value: !Sub ${EnvironmentName} Public Routes
DefaultPublicRoute:
Type: AWS::EC2::Route
DependsOn: InternetGatewayAttachment
Properties:
RouteTableId: !Ref PublicRouteTable
DestinationCidrBlock: 0.0.0.0/0
GatewayId: !Ref InternetGateway
PublicSubnetRouteTableAssociation:
Type: AWS::EC2::SubnetRouteTableAssociation
Properties:
RouteTableId: !Ref PublicRouteTable
SubnetId: !Ref PublicSubnet
PrivateRouteTable:
Type: AWS::EC2::RouteTable
Properties:
VpcId: !Ref VPC
Tags:
- Key: Name
Value: !Sub ${EnvironmentName} Private Routes
DefaultPrivateRoute:
Type: AWS::EC2::Route
Properties:
RouteTableId: !Ref PrivateRouteTable
DestinationCidrBlock: 0.0.0.0/0
NatGatewayId: !Ref NatGateway
PrivateSubnetRouteTableAssociation:
Type: AWS::EC2::SubnetRouteTableAssociation
Properties:
RouteTableId: !Ref PrivateRouteTable
SubnetId: !Ref PrivateSubnet
BastionInstance:
Type: "AWS::EC2::Instance"
DependsOn: VPC
Properties:
ImageId: !FindInMap [Region2AMI, !Ref "AWS::Region", "AmazonLinux"]
InstanceType: t2.micro
KeyName: !Ref KeyName
SecurityGroupIds:
- !Ref BastionAccessSecurityGroup
SubnetId: !Ref PublicSubnet
Tags:
- Key: Name
Value: !Sub ${EnvironmentName} Bastion
BastionAccessSecurityGroup:
Type: "AWS::EC2::SecurityGroup"
DependsOn: VPC
Properties:
GroupName: BastionAccess
GroupDescription: Allow SSH access to Bastion instance
SecurityGroupIngress:
- CidrIp: "0.0.0.0/0"
Description: SSH
IpProtocol: tcp
FromPort: 22
ToPort: 22
Tags:
- Key: Name
Value: !Sub ${EnvironmentName} Bastion Security Group
VpcId: !Ref VPC
OutputS3Bucket:
Type: "AWS::S3::Bucket"
#DeletionPolicy: Retain
#Properties:
# BucketName: !Join ['', ['sparkml-sagemaker-workshop', '-', !Ref 'AWS::StackId']]
DataOutputBucket:
Properties:
ServiceToken: !GetAtt DeleteS3ObjectsFunction.Arn
Bucket: !Ref OutputS3Bucket
Type: "AWS::CloudFormation::CustomResource"
S3DeleteRole:
Type: AWS::IAM::Role
Properties:
Path: /spark-sagemaker/
AssumeRolePolicyDocument:
Version: 2012-10-17
Statement:
-
Effect: Allow
Principal:
Service: lambda.amazonaws.com
Action: sts:AssumeRole
Policies:
-
PolicyName: S3Access
PolicyDocument:
Version: 2012-10-17
Statement:
-
Sid: AllowLogging
Effect: Allow
Action:
- "logs:CreateLogGroup"
- "logs:CreateLogStream"
- "logs:PutLogEvents"
Resource: "*"
-
Sid: SourceBucketReadAccess
Effect: Allow
Action:
- "s3:ListBucket"
- "s3:GetObject"
Resource:
- !Sub "arn:aws:s3:::${OutputS3Bucket}"
- !Sub "arn:aws:s3:::${OutputS3Bucket}/*"
-
Sid: DestBucketWriteAccess
Effect: Allow
Action:
- "s3:ListBucket"
- "s3:GetObject"
- "s3:PutObject"
- "s3:PutObjectAcl"
- "s3:PutObjectVersionAcl"
- "s3:DeleteObject"
- "s3:DeleteObjectVersion"
Resource:
- !Sub "arn:aws:s3:::${OutputS3Bucket}"
- !Sub "arn:aws:s3:::${OutputS3Bucket}/*"
DeleteS3ObjectsFunction:
Type: AWS::Lambda::Function
DependsOn: OutputS3Bucket
Properties:
Description: Deletes objects from the S3 bucket for stack deletion
Handler: index.handler
Runtime: python2.7
Role: !GetAtt S3DeleteRole.Arn
Timeout: 120
Code:
ZipFile: |
import os
import json
import cfnresponse
import boto3
from botocore.exceptions import ClientError
client = boto3.client('s3')
import logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)
def handler(event, context):
logger.info("Received event: %s" % json.dumps(event))
bucket = event['ResourceProperties']['Bucket']
result = cfnresponse.SUCCESS
try:
if event['RequestType'] == 'Delete':
result = delete_objects(bucket)
except ClientError as e:
logger.error('Error: %s', e)
result = cfnresponse.FAILED
cfnresponse.send(event, context, result, {})
def delete_objects(bucket):
paginator = client.get_paginator('list_objects_v2')
page_iterator = paginator.paginate(Bucket=bucket)
objects = [{'Key': x['Key']} for page in page_iterator if 'Contents' in page for x in page['Contents']]
if objects is not None:
client.delete_objects(Bucket=bucket, Delete={'Objects': objects})
return cfnresponse.SUCCESS
Outputs:
VPC:
Description: A reference to the created VPC
Value: !Ref VPC
Export:
Name: SparkSagemaker-VPC
PublicSubnet:
Description: Public subnet
Value: !Ref PublicSubnet
Export:
Name: SparkSagemaker-PublicSubnet
PrivateSubnet:
Description: Private subnet
Value: !Ref PrivateSubnet
Export:
Name: SparkSagemaker-PrivateSubnet
BastionSecurityGroup:
Description: Security group of bastion host
Value: !Ref BastionAccessSecurityGroup
Export:
Name: SparkSagemaker-BastionAccessSecurityGroup
BastionIp:
Description: IP address of the bastion instance
Value: !GetAtt BastionInstance.PublicIp
Export:
Name: SparkSagemaker-BastionPublicIp
OutputS3Bucket:
Description: S3 bucket
Value: !Ref OutputS3Bucket
Export:
Name: SparkSagemaker-OutputS3Bucket
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment