Skip to content

Instantly share code, notes, and snippets.

@koba1007
Last active December 16, 2019 22:41
Show Gist options
  • Save koba1007/789811c1be3ebc4ae59e4eaf0689a74f to your computer and use it in GitHub Desktop.
Save koba1007/789811c1be3ebc4ae59e4eaf0689a74f to your computer and use it in GitHub Desktop.
MDLC-CFn.yaml
{
"Description": "AWS CloudFormation MDLC Sample Template.",
"Parameters": {
"PandasLambdaLayerArtifactKey": {
"Default": "python-libs/python36-pandas24.zip",
"Type": "String"
},
"SklearnLambdaLayerArtifactKey": {
"Default": "python-libs/python36-sklearn203.zip",
"Type": "String"
},
"MDLCWorkshopPublicArtifactBucket": {
"Default": "kk-public-ws-bucket",
"Type": "String"
},
"InputDataKey": {
"Default": "input-data",
"Type": "String"
}
},
"AWSTemplateFormatVersion": "2010-09-09",
"Outputs": {
"StackArn": {
"Description": "Don't remove this output!",
"Value": {
"Ref": "AWS::StackId"
}
}
},
"Resources": {
"RegisterModelLambdaFunction": {
"Type": "AWS::Lambda::Function",
"Properties": {
"Code": {
"ZipFile": "const AWS = require(\"aws-sdk\");\nconst dynamodb = new AWS.DynamoDB.DocumentClient({ apiVersion: '2012-08-10' });\n\nexports.handler = async(event, context, callback) => {\n console.log('Received event', JSON.stringify(event, null, 2));\n\n const registerModel = async(event) => {\n // get event parameters\n const modelName = event.ModelName;\n const sageMakerModelName = event.SageMakerModelName;\n const timestamp = DateUtil.formatDateTimeString(\n DateUtil.getDateFromUtcString(event.Timestamp));\n\n // build item\n var params = {\n TableName: \"MODEL_REGISTRY\",\n Item: {\n \"MODEL_NAME\": modelName,\n \"SAGEMAKER_MODEL_NAME\": sageMakerModelName,\n \"MODEL_TIMESTAMP\": timestamp\n }\n };\n\n // put item\n try {\n await dynamodb.put(params).promise();\n } catch (error) {\n return {\n statusCode: 400,\n error: `Could not post: ${error.stack}`\n };\n }\n\n // build return object\n const ret = {\n ModelName: modelName,\n SageMakerModelName: sageMakerModelName,\n Timestamp: timestamp\n };\n\n return ret;\n };\n\n return registerModel(event).then((result) => {\n callback(null, result);\n });\n};\n\nclass DateUtil {\n static getDateFromUtcString(dateString) {\n return new Date(dateString);\n }\n\n static formatDateString(date) {\n return date.toISOString().split('T')[0];\n }\n\n static formatDateTimeString(date) {\n return date.toISOString();\n }\n\n static formatDateStringShort(date) {\n const year = date.getUTCFullYear().toString();\n const month = date.getUTCMonth() + 1;\n const dayOfMonth = date.getUTCDate();\n const monthFull = month < 10 ? '0' + month : month;\n const dayOfMonthFull = dayOfMonth < 10 ? '0' + dayOfMonth : dayOfMonth;\n return year + monthFull + dayOfMonthFull;\n }\n\n static formatDateTimeStringShort(date) {\n const year = date.getUTCFullYear().toString();\n const month = date.getUTCMonth() + 1;\n const dayOfMonth = date.getUTCDate();\n const hours = date.getUTCHours();\n const minutes = date.getUTCMinutes();\n const seconds = date.getUTCSeconds();\n\n const monthFull = month < 10 ? '0' + month : month;\n const dayOfMonthFull = dayOfMonth < 10 ? '0' + dayOfMonth : dayOfMonth;\n const hoursFull = hours < 10 ? '0' + hours : hours;\n const minutesFull = minutes < 10 ? '0' + minutes : minutes;\n const secondsFull = seconds < 10 ? '0' + seconds : seconds;\n\n return year + monthFull + dayOfMonthFull + 'T' + hoursFull + minutesFull + secondsFull;\n }\n};\n"
},
"Description": "Register a SageMaker Model",
"Tags": [
{
"Value": "SAM",
"Key": "lambda:createdBy"
}
],
"MemorySize": 128,
"Handler": "index.handler",
"Role": {
"Fn::GetAtt": [
"RegisterModelLambdaFunctionRole",
"Arn"
]
},
"Timeout": 60,
"Runtime": "nodejs8.10",
"FunctionName": "mdlc-training-register-model"
}
},
"InitializeBatchInferenceWorkflowLambdaFunctionRole": {
"Type": "AWS::IAM::Role",
"Properties": {
"ManagedPolicyArns": [
"arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole",
"arn:aws:iam::aws:policy/AmazonS3FullAccess"
],
"AssumeRolePolicyDocument": {
"Version": "2012-10-17",
"Statement": [
{
"Action": [
"sts:AssumeRole"
],
"Effect": "Allow",
"Principal": {
"Service": [
"lambda.amazonaws.com"
]
}
}
]
}
}
},
"FindModelLambdaFunctionRole": {
"Type": "AWS::IAM::Role",
"Properties": {
"ManagedPolicyArns": [
"arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole",
"arn:aws:iam::aws:policy/AmazonDynamoDBFullAccess",
"arn:aws:iam::aws:policy/AmazonS3FullAccess"
],
"AssumeRolePolicyDocument": {
"Version": "2012-10-17",
"Statement": [
{
"Action": [
"sts:AssumeRole"
],
"Effect": "Allow",
"Principal": {
"Service": [
"lambda.amazonaws.com"
]
}
}
]
}
}
},
"SageMakerTrainingRole": {
"Type": "AWS::IAM::Role",
"Properties": {
"ManagedPolicyArns": [
"arn:aws:iam::aws:policy/AmazonSageMakerFullAccess",
"arn:aws:iam::aws:policy/AmazonS3FullAccess"
],
"AssumeRolePolicyDocument": {
"Version": "2012-10-17",
"Statement": [
{
"Action": [
"sts:AssumeRole"
],
"Effect": "Allow",
"Principal": {
"Service": [
"sagemaker.amazonaws.com"
]
}
}
]
}
}
},
"BatchInferenceWorkflowRole": {
"Type": "AWS::IAM::Role",
"Properties": {
"ManagedPolicyArns": [
"arn:aws:iam::aws:policy/AWSLambdaFullAccess",
"arn:aws:iam::aws:policy/service-role/AWSLambdaRole",
"arn:aws:iam::aws:policy/AmazonSageMakerFullAccess",
"arn:aws:iam::aws:policy/AWSBatchFullAccess",
"arn:aws:iam::aws:policy/AmazonS3FullAccess",
"arn:aws:iam::aws:policy/AWSStepFunctionsFullAccess"
],
"AssumeRolePolicyDocument": {
"Version": "2012-10-17",
"Statement": [
{
"Action": [
"sts:AssumeRole"
],
"Effect": "Allow",
"Principal": {
"Service": [
"states.amazonaws.com"
]
}
}
]
}
}
},
"InitializeTrainingWorkflowLambdaFunction": {
"Type": "AWS::Lambda::Function",
"Properties": {
"Code": {
"ZipFile": "const AWS = require(\"aws-sdk\");\nconst s3 = new AWS.S3();\n\nexports.handler = async(event, context, callback) => {\n console.log('Received event', JSON.stringify(event, null, 2));\n\n const initializeWorkflow = async(event) => {\n // get event parameters\n const modelName = event.WorkflowInput.ModelName;\n const s3RootBucket = event.S3RootBucket;\n const dataDate = DateUtil.getDateFromUtcString(event.WorkflowInput.DataDate);\n\n // generate training job name\n const runDate = new Date();\n const trainingJobName = `${modelName}-${DateUtil.formatDateTimeStringShort(runDate)}`;\n\n // generate training input folder\n const trainingInput = {\n S3Key: `input-data/${event.WorkflowInput.DataDate}/train`,\n S3Uri: `s3://${s3RootBucket}/input-data/${event.WorkflowInput.DataDate}/train`\n };\n\n // generate training validation folder\n const validationInput = {\n S3Key: `input-data/${event.WorkflowInput.DataDate}/validation`,\n S3Uri: `s3://${s3RootBucket}/input-data/${event.WorkflowInput.DataDate}/validation`\n };\n\n // generate training output folder\n const trainingOutput = {\n S3Key: `model/${event.WorkflowInput.DataDate}/${DateUtil.formatDateTimeStringShort(runDate)}`,\n S3Uri: `s3://${s3RootBucket}/model/${event.WorkflowInput.DataDate}/${DateUtil.formatDateTimeStringShort(runDate)}`\n }\n\n // generate training config\n const trainingConfig = {\n TrainingJobName: trainingJobName,\n TrainingInput: trainingInput,\n ValidationInput: validationInput,\n TrainingOutput: trainingOutput\n };\n\n // upload input request to S3\n const s3UploadParams = {\n Bucket: s3RootBucket,\n Key: 'workflow_request.json',\n Body: JSON.stringify(event, null, 2),\n ContentType: 'application/json'\n };\n await s3.putObject(s3UploadParams);\n\n // build return object\n const ret = {\n WorkflowRequest: event.WorkflowInput,\n S3RootBucket: s3RootBucket,\n DataDate: dataDate,\n RunDate: runDate,\n TrainingConfig: trainingConfig\n };\n\n return ret;\n };\n\n return initializeWorkflow(event).then((result) => {\n callback(null, result);\n });\n};\n\nclass DateUtil {\n static getDateFromUtcString(dateString) {\n return new Date(dateString);\n }\n\n static formatDateString(date) {\n return date.toISOString().split('T')[0];\n }\n\n static formatDateTimeString(date) {\n return date.toISOString();\n }\n\n static formatDateStringShort(date) {\n const year = date.getUTCFullYear().toString();\n const month = date.getUTCMonth() + 1;\n const dayOfMonth = date.getUTCDate();\n const monthFull = month < 10 ? '0' + month : month;\n const dayOfMonthFull = dayOfMonth < 10 ? '0' + dayOfMonth : dayOfMonth;\n return year + monthFull + dayOfMonthFull;\n }\n\n static formatDateTimeStringShort(date) {\n const year = date.getUTCFullYear().toString();\n const month = date.getUTCMonth() + 1;\n const dayOfMonth = date.getUTCDate();\n const hours = date.getUTCHours();\n const minutes = date.getUTCMinutes();\n const seconds = date.getUTCSeconds();\n\n const monthFull = month < 10 ? '0' + month : month;\n const dayOfMonthFull = dayOfMonth < 10 ? '0' + dayOfMonth : dayOfMonth;\n const hoursFull = hours < 10 ? '0' + hours : hours;\n const minutesFull = minutes < 10 ? '0' + minutes : minutes;\n const secondsFull = seconds < 10 ? '0' + seconds : seconds;\n\n return year + monthFull + dayOfMonthFull + 'T' + hoursFull + minutesFull + secondsFull;\n }\n};\n"
},
"Description": "Initializes a Training ML workflow",
"Tags": [
{
"Value": "SAM",
"Key": "lambda:createdBy"
}
],
"MemorySize": 128,
"Handler": "index.handler",
"Role": {
"Fn::GetAtt": [
"InitializeTrainingWorkflowLambdaFunctionRole",
"Arn"
]
},
"Timeout": 60,
"Runtime": "nodejs8.10",
"FunctionName": "mdlc-training-initialize-workflow"
}
},
"MDLCBatchInferenceWorkflow": {
"Type": "AWS::StepFunctions::StateMachine",
"Properties": {
"RoleArn": {
"Fn::GetAtt": [
"BatchInferenceWorkflowRole",
"Arn"
]
},
"DefinitionString": {
"Fn::Sub": "{\n \"StartAt\": \"Initialize\",\n \"States\": {\n \"Initialize\": {\n \"Type\": \"Task\",\n \"Resource\": \"${InitializeBatchInferenceWorkflowLambdaFunction.Arn}\",\n \"Parameters\": {\n \"WorkflowInput.$\": \"$\",\n \"S3RootBucket\": \"${MDLCS3Bucket}\"\n },\n \"ResultPath\": \"$\",\n \"Next\": \"Find Model\",\n \"Catch\": [\n {\n \"ErrorEquals\": [\"States.ALL\"],\n \"Next\": \"Handle Error\",\n \"ResultPath\": \"$.Error\"\n }\n ]\n },\n \"Find Model\": {\n \"Type\": \"Task\",\n \"Resource\": \"${FindModelLambdaFunction.Arn}\",\n \"Parameters\": {\n \"ModelName.$\": \"$.WorkflowRequest.ModelName\"\n },\n \"ResultPath\": \"$.FindModelOutput\",\n \"Next\": \"Inference\",\n \"Catch\": [\n {\n \"ErrorEquals\": [\"States.ALL\"],\n \"Next\": \"Handle Error\",\n \"ResultPath\": \"$.Error\"\n }\n ]\n },\n \"Inference\": {\n \"Type\": \"Task\",\n \"Resource\": \"arn:aws:states:::sagemaker:createTransformJob.sync\",\n \"Parameters\": {\n \"TransformJobName.$\": \"$.InferenceConfig.InferenceJobName\",\n \"ModelName.$\": \"$.FindModelOutput.SageMakerModelName\",\n \"TransformInput\": {\n \"DataSource\": {\n \"S3DataSource\": {\n \"S3DataType\": \"S3Prefix\",\n \"S3Uri.$\": \"$.InferenceConfig.InferenceInput.S3Uri\"\n }\n },\n \"ContentType\": \"text/csv\",\n \"SplitType\": \"Line\",\n \"CompressionType\": \"None\"\n },\n \"TransformOutput\": {\n \"S3OutputPath.$\": \"$.InferenceConfig.InferenceOutput.S3Uri\",\n \"Accept\": \"text/csv\",\n \"AssembleWith\": \"Line\"\n },\n \"TransformResources\": {\n \"InstanceType\": \"ml.c4.2xlarge\",\n \"InstanceCount\": 10\n }\n },\n \"ResultPath\": \"$.InferenceOutput\",\n \"Next\": \"Monitor Model Performance\",\n \"Catch\": [\n {\n \"ErrorEquals\": [\"States.ALL\"],\n \"Next\": \"Handle Error\",\n \"ResultPath\": \"$.Error\"\n }\n ]\n },\n \"Monitor Model Performance\": {\n \"Type\": \"Task\",\n \"Resource\": \"${MonitorModelPerformanceLambdaFunction.Arn}\",\n \"Parameters\": {\n \"PredictionInput.$\": \"$.MonitorConfig.MonitorInput.PredictionS3Uri\",\n \"TargetInput.$\": \"$.MonitorConfig.MonitorInput.TargetS3Uri\"\n },\n \"ResultPath\": \"$.MonitoringOutput\",\n \"Next\": \"Is Re-training Needed?\",\n \"Catch\": [\n {\n \"ErrorEquals\": [\"States.ALL\"],\n \"Next\": \"Handle Error\",\n \"ResultPath\": \"$.Error\"\n }\n ]\n },\n \"Is Re-training Needed?\": {\n \"Type\": \"Choice\",\n \"Choices\": [{\n \"Variable\": \"$.MonitoringOutput.ModelPerformance\",\n \"NumericLessThan\": 1.0,\n \"Next\": \"Re-train Model\"\n }, {\n \"Variable\": \"$.MonitoringOutput.ModelPerformance\",\n \"NumericGreaterThanEquals\": 1.0,\n \"Next\": \"Finalize\"\n }],\n \"Default\": \"Finalize\"\n },\n \"Re-train Model\": {\n \"Type\": \"Task\",\n \"Resource\": \"arn:aws:states:::states:startExecution.sync\",\n \"Parameters\": {\n \"Input\": {\n \"ModelName.$\": \"$.WorkflowRequest.ModelName\",\n \"DataDate.$\": \"$.WorkflowRequest.DataDate\"\n },\n \"StateMachineArn\": \"${MDLCTrainingWorkflow}\"\n },\n \"Next\": \"Finalize\",\n \"Catch\": [\n {\n \"ErrorEquals\": [\"States.ALL\"],\n \"Next\": \"Handle Error\",\n \"ResultPath\": \"$.Error\"\n }\n ]\n },\n \"Finalize\": {\n \"Type\": \"Pass\",\n \"End\": true\n },\n \"Handle Error\": {\n \"Type\": \"Pass\",\n \"Next\": \"Failure\"\n },\n \"Failure\": {\n \"Type\": \"Fail\"\n }\n }\n}"
},
"StateMachineName": "mdlc-batch-inference-workflow"
}
},
"TrainingWorkflowRole": {
"Type": "AWS::IAM::Role",
"Properties": {
"ManagedPolicyArns": [
"arn:aws:iam::aws:policy/AWSLambdaFullAccess",
"arn:aws:iam::aws:policy/service-role/AWSLambdaRole",
"arn:aws:iam::aws:policy/AmazonSageMakerFullAccess",
"arn:aws:iam::aws:policy/AmazonS3FullAccess"
],
"AssumeRolePolicyDocument": {
"Version": "2012-10-17",
"Statement": [
{
"Action": [
"sts:AssumeRole"
],
"Effect": "Allow",
"Principal": {
"Service": [
"states.amazonaws.com"
]
}
}
]
}
}
},
"MonitorModelPerformanceLambdaFunction": {
"Type": "AWS::Lambda::Function",
"Properties": {
"Layers": [
{
"Ref": "SklearnLayer"
},
{
"Ref": "PandasLayer"
}
],
"Code": {
"ZipFile": "import boto3\nimport os\n\nimport numpy as np\nimport pandas as pd\n\nfrom sklearn.metrics import roc_auc_score\n\n\ns3_client = boto3.client('s3')\n\ndef lambda_handler(event, context):\n\n ## load the dataset date and s3 input path\n # dataset_date = event['DatasetDate']\n # run_date = event['RunDate']\n preduction_path = event['PredictionInput']\n target_path = event['TargetInput']\n\n ## read the target and inference result\n target = pd.read_csv(target_path, header=None, names=['target'])\n prediction = pd.read_csv(preduction_path, header=None, names=['prediction'])\n\n ## compute the monitoring result\n monitoring_result = roc_auc_score(target.values, prediction.values)\n\n # print('By {}, for the dataset date {}, the AUC is {}'.format(run_date, dataset_date, monitoring_result))\n\n return {'ModelPerformance': monitoring_result}\n"
},
"Description": "Monitor SageMaker model performance",
"Tags": [
{
"Value": "SAM",
"Key": "lambda:createdBy"
}
],
"MemorySize": 128,
"Handler": "index.lambda_handler",
"Role": {
"Fn::GetAtt": [
"MonitorModelPerformanceLambdaFunctionRole",
"Arn"
]
},
"Timeout": 900,
"Runtime": "python3.6",
"FunctionName": "mdlc-monitoring-model-performance"
}
},
"SklearnLayer": {
"Type": "AWS::Lambda::LayerVersion",
"Properties": {
"Content": {
"S3Bucket": {
"Ref": "MDLCWorkshopPublicArtifactBucket"
},
"S3Key": {
"Ref": "SklearnLambdaLayerArtifactKey"
}
},
"LayerName": "sklearn-layer",
"Description": "numpy 1-14-3 scipy 1-1-0 skleran 0-20-3",
"CompatibleRuntimes": [
"python3.6"
]
}
},
"MDLCTrainingWorkflow": {
"Type": "AWS::StepFunctions::StateMachine",
"Properties": {
"RoleArn": {
"Fn::GetAtt": [
"TrainingWorkflowRole",
"Arn"
]
},
"DefinitionString": {
"Fn::Sub": "{\n \"StartAt\": \"Initialize\",\n \"States\": {\n \"Initialize\": {\n \"Type\": \"Task\",\n \"Resource\": \"${InitializeTrainingWorkflowLambdaFunction.Arn}\",\n \"Parameters\": {\n \"WorkflowInput.$\": \"$\",\n \"S3RootBucket\": \"${MDLCS3Bucket}\"\n },\n \"ResultPath\": \"$\",\n \"Next\": \"Train Model\",\n \"Catch\": [\n {\n \"ErrorEquals\": [\"States.ALL\"],\n \"Next\": \"Handle Error\",\n \"ResultPath\": \"$.Error\"\n }\n ]\n },\n \"Train Model\": {\n \"Type\": \"Task\",\n \"Resource\": \"arn:aws:states:::sagemaker:createTrainingJob.sync\",\n \"Parameters\": {\n \"TrainingJobName.$\": \"$.TrainingConfig.TrainingJobName\",\n \"AlgorithmSpecification\": {\n \"TrainingImage\": \"433757028032.dkr.ecr.us-west-2.amazonaws.com/xgboost:1\",\n \"TrainingInputMode\": \"File\"\n },\n \"RoleArn\": \"${SageMakerTrainingRole.Arn}\",\n \"HyperParameters\": {\n \"objective\": \"binary:logistic\",\n \"colsample_bytree\": \"0.7\",\n \"max_depth\": \"4\",\n \"eta\": \"0.2\",\n \"gamma\": \"4\",\n \"min_child_weight\": \"6\",\n \"subsample\": \"0.7\",\n \"learning_rate\": \"0.075\",\n \"silent\": \"0\",\n \"num_round\": \"200\",\n \"seed\": \"0\"\n },\n \"InputDataConfig\": [\n {\n \"ChannelName\": \"train\",\n \"DataSource\": {\n \"S3DataSource\": {\n \"S3DataType\": \"S3Prefix\",\n \"S3Uri.$\": \"$.TrainingConfig.TrainingInput.S3Uri\",\n \"S3DataDistributionType\": \"FullyReplicated\"\n }\n },\n \"CompressionType\": \"None\",\n \"RecordWrapperType\": \"None\",\n \"ContentType\": \"text/csv\"\n },\n {\n \"ChannelName\": \"validation\",\n \"DataSource\": {\n \"S3DataSource\": {\n \"S3DataType\": \"S3Prefix\",\n \"S3Uri.$\": \"$.TrainingConfig.ValidationInput.S3Uri\",\n \"S3DataDistributionType\": \"FullyReplicated\"\n }\n },\n \"CompressionType\": \"None\",\n \"RecordWrapperType\": \"None\",\n \"ContentType\": \"text/csv\"\n }\n ],\n \"OutputDataConfig\": {\n \"S3OutputPath.$\": \"$.TrainingConfig.TrainingOutput.S3Uri\"\n },\n \"ResourceConfig\": {\n \"InstanceCount\": 1,\n \"InstanceType\": \"ml.m5.4xlarge\",\n \"VolumeSizeInGB\": 50\n },\n \"StoppingCondition\": {\n \"MaxRuntimeInSeconds\": 3600\n }\n },\n \"ResultPath\": \"$.TrainingOutput\",\n \"Next\": \"Create Model\",\n \"Catch\": [\n {\n \"ErrorEquals\": [\"States.ALL\"],\n \"Next\": \"Handle Error\",\n \"ResultPath\": \"$.Error\"\n }\n ]\n },\n \"Create Model\": {\n \"Type\": \"Task\",\n \"Resource\": \"arn:aws:states:::sagemaker:createModel\",\n \"Parameters\": {\n \"ModelName.$\": \"$.TrainingConfig.TrainingJobName\",\n \"ExecutionRoleArn\": \"${SageMakerTrainingRole.Arn}\",\n \"PrimaryContainer\": {\n \"Image\": \"433757028032.dkr.ecr.us-west-2.amazonaws.com/xgboost:1\",\n \"ModelDataUrl.$\": \"$.TrainingOutput.ModelArtifacts.S3ModelArtifacts\"\n }\n },\n \"ResultPath\": \"$.CreateModelOutput\",\n \"Next\": \"Register Model\",\n \"Catch\": [\n {\n \"ErrorEquals\": [\"States.ALL\"],\n \"Next\": \"Handle Error\",\n \"ResultPath\": \"$.Error\"\n }\n ]\n },\n \"Register Model\": {\n \"Type\": \"Task\",\n \"Resource\": \"${RegisterModelLambdaFunction.Arn}\",\n \"Parameters\": {\n \"ModelName.$\": \"$.WorkflowRequest.ModelName\",\n \"SageMakerModelName.$\": \"$.TrainingConfig.TrainingJobName\",\n \"Timestamp.$\": \"$.RunDate\"\n },\n \"ResultPath\": \"$.RegisterModelOutput\",\n \"Next\": \"Finalize\",\n \"Catch\": [\n {\n \"ErrorEquals\": [\"States.ALL\"],\n \"Next\": \"Handle Error\",\n \"ResultPath\": \"$.Error\"\n }\n ]\n },\n \"Finalize\": {\n \"Type\": \"Pass\",\n \"End\": true\n },\n \"Handle Error\": {\n \"Type\": \"Pass\",\n \"Next\": \"Failure\"\n },\n \"Failure\": {\n \"Type\": \"Fail\"\n }\n }\n}"
},
"StateMachineName": "mdlc-training-workflow"
}
},
"InitializeBatchInferenceWorkflowLambdaFunction": {
"Type": "AWS::Lambda::Function",
"Properties": {
"Code": {
"ZipFile": "const AWS = require(\"aws-sdk\");\nconst s3 = new AWS.S3();\n\nexports.handler = async(event, context, callback) => {\n console.log('Received event', JSON.stringify(event, null, 2));\n\n const initializeWorkflow = async(event) => {\n // get event parameters\n const modelName = event.WorkflowInput.ModelName;\n const dataDate = DateUtil.getDateFromUtcString(event.WorkflowInput.DataDate);\n const s3RootBucket = event.S3RootBucket;\n\n // generate inference job name\n const runDate = new Date();\n const inferenceJobName = `${modelName}-${DateUtil.formatDateTimeStringShort(runDate)}`;\n\n // generate inference input folder\n const inferenceInput = {\n S3Key: `input-data/${event.WorkflowInput.DataDate}/holdout_data`,\n S3Uri: `s3://${s3RootBucket}/input-data/${event.WorkflowInput.DataDate}/holdout_data`\n };\n\n // generate inference output folder\n const inferenceOutput = {\n S3Key: `output-data/${event.WorkflowInput.DataDate}/${DateUtil.formatDateTimeStringShort(runDate)}`,\n S3Uri: `s3://${s3RootBucket}/output-data/${event.WorkflowInput.DataDate}/${DateUtil.formatDateTimeStringShort(runDate)}`\n };\n\n // generate inference config\n const inferenceConfig = {\n InferenceJobName: inferenceJobName,\n InferenceInput: inferenceInput,\n InferenceOutput: inferenceOutput\n };\n\n // generate holdout target input folder\n const monitorInput = {\n TargetS3Uri: `s3://${s3RootBucket}/input-data/${event.WorkflowInput.DataDate}/holdout_target/holdout_target.csv`,\n PredictionS3Uri: `s3://${s3RootBucket}/output-data/${event.WorkflowInput.DataDate}/${DateUtil.formatDateTimeStringShort(runDate)}/holdout_data.csv.out`\n }\n\n // generate monitor config\n const monitorConfig = {\n MonitorInput: monitorInput\n };\n\n // upload input request to S3\n const s3UploadParams = {\n Bucket: s3RootBucket,\n Key: 'workflow_request.json',\n Body: JSON.stringify(event, null, 2),\n ContentType: 'application/json'\n };\n await s3.putObject(s3UploadParams);\n\n // build return object\n const ret = {\n WorkflowRequest: event.WorkflowInput,\n S3RootBucket: s3RootBucket,\n DataDate: dataDate,\n RunDate: runDate,\n InferenceConfig: inferenceConfig,\n MonitorConfig: monitorConfig\n };\n\n return ret;\n };\n\n return initializeWorkflow(event).then((result) => {\n callback(null, result);\n });\n};\n\nclass DateUtil {\n static getDateFromUtcString(dateString) {\n return new Date(dateString);\n }\n\n static formatDateString(date) {\n return date.toISOString().split('T')[0];\n }\n\n static formatDateTimeString(date) {\n return date.toISOString();\n }\n\n static formatDateStringShort(date) {\n const year = date.getUTCFullYear().toString();\n const month = date.getUTCMonth() + 1;\n const dayOfMonth = date.getUTCDate();\n const monthFull = month < 10 ? '0' + month : month;\n const dayOfMonthFull = dayOfMonth < 10 ? '0' + dayOfMonth : dayOfMonth;\n return year + monthFull + dayOfMonthFull;\n }\n\n static formatDateTimeStringShort(date) {\n const year = date.getUTCFullYear().toString();\n const month = date.getUTCMonth() + 1;\n const dayOfMonth = date.getUTCDate();\n const hours = date.getUTCHours();\n const minutes = date.getUTCMinutes();\n const seconds = date.getUTCSeconds();\n\n const monthFull = month < 10 ? '0' + month : month;\n const dayOfMonthFull = dayOfMonth < 10 ? '0' + dayOfMonth : dayOfMonth;\n const hoursFull = hours < 10 ? '0' + hours : hours;\n const minutesFull = minutes < 10 ? '0' + minutes : minutes;\n const secondsFull = seconds < 10 ? '0' + seconds : seconds;\n\n return year + monthFull + dayOfMonthFull + 'T' + hoursFull + minutesFull + secondsFull;\n }\n};\n"
},
"Description": "Initializes a Batch Inference ML workflow",
"Tags": [
{
"Value": "SAM",
"Key": "lambda:createdBy"
}
],
"MemorySize": 128,
"Handler": "index.handler",
"Role": {
"Fn::GetAtt": [
"InitializeBatchInferenceWorkflowLambdaFunctionRole",
"Arn"
]
},
"Timeout": 60,
"Runtime": "nodejs8.10",
"FunctionName": "mdlc-batch-inference-initialize-workflow"
}
},
"PandasLayer": {
"Type": "AWS::Lambda::LayerVersion",
"Properties": {
"Content": {
"S3Bucket": {
"Ref": "MDLCWorkshopPublicArtifactBucket"
},
"S3Key": {
"Ref": "PandasLambdaLayerArtifactKey"
}
},
"LayerName": "pandas-layer",
"Description": "pandas 0-24-2 pyst 2018-4 s3fs 0-1-5",
"CompatibleRuntimes": [
"python3.6"
]
}
},
"CopyS3ResourceFunctionRole": {
"Type": "AWS::IAM::Role",
"Properties": {
"ManagedPolicyArns": [
"arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole",
"arn:aws:iam::aws:policy/AmazonS3FullAccess"
],
"AssumeRolePolicyDocument": {
"Version": "2012-10-17",
"Statement": [
{
"Action": [
"sts:AssumeRole"
],
"Effect": "Allow",
"Principal": {
"Service": [
"lambda.amazonaws.com"
]
}
}
]
}
}
},
"FindModelLambdaFunction": {
"Type": "AWS::Lambda::Function",
"Properties": {
"Code": {
"ZipFile": "const AWS = require(\"aws-sdk\");\nconst dynamodb = new AWS.DynamoDB.DocumentClient({ apiVersion: '2012-08-10' });\n\nexports.handler = async(event, context, callback) => {\n console.log('Received event', JSON.stringify(event, null, 2));\n\n const findModel = async(event) => {\n // get event parameters\n const modelName = event.ModelName;\n\n // build item\n var params = {\n TableName: \"MODEL_REGISTRY\",\n KeyConditionExpression: 'MODEL_NAME = :ModelName',\n ProjectionExpression: 'SAGEMAKER_MODEL_NAME',\n ExpressionAttributeValues: {\n ':ModelName': modelName\n },\n ConsistentRead: true,\n ScanIndexForward: false,\n Limit: 1\n };\n\n // put item\n let data;\n try {\n data = await dynamodb.query(params).promise();\n } catch (error) {\n return {\n statusCode: 400,\n error: `Could not post: ${error.stack}`\n };\n }\n\n if (data.Items.length == 0) {\n throw \"Unable to find model\";\n }\n\n // build return object\n const ret = {\n SageMakerModelName: data.Items[0]['SAGEMAKER_MODEL_NAME']\n };\n\n return ret;\n };\n\n return findModel(event).then((result) => {\n callback(null, result);\n });\n};\n\nclass DateUtil {\n static getDateFromUtcString(dateString) {\n return new Date(dateString);\n }\n\n static formatDateString(date) {\n return date.toISOString().split('T')[0];\n }\n\n static formatDateTimeString(date) {\n return date.toISOString();\n }\n\n static formatDateStringShort(date) {\n const year = date.getUTCFullYear().toString();\n const month = date.getUTCMonth() + 1;\n const dayOfMonth = date.getUTCDate();\n const monthFull = month < 10 ? '0' + month : month;\n const dayOfMonthFull = dayOfMonth < 10 ? '0' + dayOfMonth : dayOfMonth;\n return year + monthFull + dayOfMonthFull;\n }\n\n static formatDateTimeStringShort(date) {\n const year = date.getUTCFullYear().toString();\n const month = date.getUTCMonth() + 1;\n const dayOfMonth = date.getUTCDate();\n const hours = date.getUTCHours();\n const minutes = date.getUTCMinutes();\n const seconds = date.getUTCSeconds();\n\n const monthFull = month < 10 ? '0' + month : month;\n const dayOfMonthFull = dayOfMonth < 10 ? '0' + dayOfMonth : dayOfMonth;\n const hoursFull = hours < 10 ? '0' + hours : hours;\n const minutesFull = minutes < 10 ? '0' + minutes : minutes;\n const secondsFull = seconds < 10 ? '0' + seconds : seconds;\n\n return year + monthFull + dayOfMonthFull + 'T' + hoursFull + minutesFull + secondsFull;\n }\n};\n"
},
"Description": "Find the latest SageMaker Model",
"Tags": [
{
"Value": "SAM",
"Key": "lambda:createdBy"
}
],
"MemorySize": 128,
"Handler": "index.handler",
"Role": {
"Fn::GetAtt": [
"FindModelLambdaFunctionRole",
"Arn"
]
},
"Timeout": 60,
"Runtime": "nodejs8.10",
"FunctionName": "mdlc-batch-inference-find-model"
}
},
"MonitorModelPerformanceLambdaFunctionRole": {
"Type": "AWS::IAM::Role",
"Properties": {
"ManagedPolicyArns": [
"arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole",
"arn:aws:iam::aws:policy/AmazonS3FullAccess"
],
"AssumeRolePolicyDocument": {
"Version": "2012-10-17",
"Statement": [
{
"Action": [
"sts:AssumeRole"
],
"Effect": "Allow",
"Principal": {
"Service": [
"lambda.amazonaws.com"
]
}
}
]
}
}
},
"CopyS3ResourceFunction": {
"Type": "AWS::Lambda::Function",
"Properties": {
"Code": {
"ZipFile": "import base64\nimport boto3\nimport httplib\nimport json\nimport os\n\nfrom urllib2 import build_opener, HTTPHandler, Request\n\n\ns3_client = boto3.client(\"s3\")\n\ndef sendResponse(event, context, status, message):\n bucket = event[\"ResourceProperties\"].get(\"Target\", {}).get(\"Bucket\")\n key = event[\"ResourceProperties\"].get(\"Target\", {}).get(\"Key\")\n\n body = json.dumps({\n \"Status\": status,\n \"Reason\": message,\n \"StackId\": event['StackId'],\n \"RequestId\": event['RequestId'],\n \"LogicalResourceId\": event['LogicalResourceId'],\n \"PhysicalResourceId\": \"s3://{}/{}\".format(bucket, key),\n \"Data\": {\n \"Bucket\": bucket,\n \"Key\": key,\n },\n })\n\n request = Request(event['ResponseURL'], data=body)\n request.add_header('Content-Type', '')\n request.add_header('Content-Length', len(body))\n request.get_method = lambda: 'PUT'\n\n opener = build_opener(HTTPHandler)\n response = opener.open(request)\n\ndef handler(event, context):\n print(\"Received request:\", json.dumps(event, indent=4))\n\n request = event[\"RequestType\"]\n properties = event[\"ResourceProperties\"]\n print properties\n\n if \"Target\" not in properties or all(prop not in properties for prop in [\"Body\", \"Base64Body\", \"Source\"]):\n return sendResponse(event, context, \"FAILED\", \"Missing required parameters\")\n\n target = properties[\"Target\"]\n\n sink_bucket = target[\"Bucket\"]\n sink_prefix = target[\"Key\"]\n\n if request in (\"Create\", \"Update\"):\n if \"Body\" in properties:\n target.update({\n \"Body\": properties[\"Body\"],\n })\n\n s3_client.put_object(**target)\n\n elif \"Base64Body\" in properties:\n try:\n body = base64.b64decode(properties[\"Base64Body\"])\n except:\n return sendResponse(event, context, \"FAILED\", \"Malformed Base64Body\")\n\n target.update({\n \"Body\": body\n })\n\n s3_client.put_object(**target)\n\n elif \"Source\" in properties:\n source = properties[\"Source\"]\n source_bucket = source[\"Bucket\"]\n source_prefix = source[\"Key\"]\n\n paginator = s3_client.get_paginator(\"list_objects_v2\")\n page_iterator = paginator.paginate(Bucket=source_bucket, Prefix=source_prefix)\n\n for source_key in {x['Key'] for page in page_iterator for x in page[\"Contents\"]}:\n sink_key = os.path.join(sink_prefix, os.path.relpath(source_key, source_prefix))\n print \"copy {} to {}\".format(source_key, sink_key)\n\n if not source_key.endswith(\"/\"):\n print \"copy {} to {}\".format(source_key, sink_key)\n s3_client.copy_object(\n CopySource={\"Bucket\": source_bucket, \"Key\": source_key},\n Bucket=sink_bucket,\n Key = sink_key,\n MetadataDirective=\"COPY\",\n TaggingDirective=\"COPY\"\n )\n\n else:\n return sendResponse(event, context, \"FAILED\", \"Malformed body\")\n\n return sendResponse(event, context, \"SUCCESS\", \"Created\")\n\n if request == \"Delete\":\n\n paginator = s3_client.get_paginator(\"list_objects_v2\")\n page_iterator = paginator.paginate(Bucket=sink_bucket, Prefix=sink_prefix)\n sink_objects = [{'Key': x['Key']} for page in page_iterator for x in page['Contents']]\n s3_client.delete_objects(\n Bucket=sink_bucket,\n Delete={'Objects': sink_objects}\n )\n\n return sendResponse(event, context, \"SUCCESS\", \"Deleted\")\n\n return sendResponse(event, context, \"FAILED\", \"Unexpected: {}\".format(request))\n"
},
"Description": "Copy S3 Input Object",
"Tags": [
{
"Value": "SAM",
"Key": "lambda:createdBy"
}
],
"MemorySize": 128,
"Handler": "index.handler",
"Role": {
"Fn::GetAtt": [
"CopyS3ResourceFunctionRole",
"Arn"
]
},
"Timeout": 900,
"Runtime": "python2.7",
"FunctionName": "mdlc-copy-object"
}
},
"RegisterModelLambdaFunctionRole": {
"Type": "AWS::IAM::Role",
"Properties": {
"ManagedPolicyArns": [
"arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole",
"arn:aws:iam::aws:policy/AmazonDynamoDBFullAccess",
"arn:aws:iam::aws:policy/AmazonS3FullAccess"
],
"AssumeRolePolicyDocument": {
"Version": "2012-10-17",
"Statement": [
{
"Action": [
"sts:AssumeRole"
],
"Effect": "Allow",
"Principal": {
"Service": [
"lambda.amazonaws.com"
]
}
}
]
}
}
},
"InputDataObject": {
"Type": "Custom::S3Object",
"Properties": {
"Source": {
"Bucket": {
"Ref": "MDLCWorkshopPublicArtifactBucket"
},
"Key": "input-data"
},
"ServiceToken": {
"Fn::GetAtt": "CopyS3ResourceFunction.Arn"
},
"Target": {
"Bucket": {
"Ref": "MDLCS3Bucket"
},
"Key": "input-data"
}
}
},
"MDLCS3Bucket": {
"Type": "AWS::S3::Bucket"
},
"InitializeTrainingWorkflowLambdaFunctionRole": {
"Type": "AWS::IAM::Role",
"Properties": {
"ManagedPolicyArns": [
"arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole",
"arn:aws:iam::aws:policy/AmazonS3FullAccess"
],
"AssumeRolePolicyDocument": {
"Version": "2012-10-17",
"Statement": [
{
"Action": [
"sts:AssumeRole"
],
"Effect": "Allow",
"Principal": {
"Service": [
"lambda.amazonaws.com"
]
}
}
]
}
}
},
"ModelTable": {
"Type": "AWS::DynamoDB::Table",
"Properties": {
"KeySchema": [
{
"KeyType": "HASH",
"AttributeName": "MODEL_NAME"
},
{
"KeyType": "RANGE",
"AttributeName": "MODEL_TIMESTAMP"
}
],
"TableName": "MODEL_REGISTRY",
"AttributeDefinitions": [
{
"AttributeName": "MODEL_NAME",
"AttributeType": "S"
},
{
"AttributeName": "MODEL_TIMESTAMP",
"AttributeType": "S"
}
],
"BillingMode": "PAY_PER_REQUEST"
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment