Last active
December 16, 2019 22:41
-
-
Save koba1007/789811c1be3ebc4ae59e4eaf0689a74f to your computer and use it in GitHub Desktop.
MDLC-CFn.yaml
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ | |
"Description": "AWS CloudFormation MDLC Sample Template.", | |
"Parameters": { | |
"PandasLambdaLayerArtifactKey": { | |
"Default": "python-libs/python36-pandas24.zip", | |
"Type": "String" | |
}, | |
"SklearnLambdaLayerArtifactKey": { | |
"Default": "python-libs/python36-sklearn203.zip", | |
"Type": "String" | |
}, | |
"MDLCWorkshopPublicArtifactBucket": { | |
"Default": "kk-public-ws-bucket", | |
"Type": "String" | |
}, | |
"InputDataKey": { | |
"Default": "input-data", | |
"Type": "String" | |
} | |
}, | |
"AWSTemplateFormatVersion": "2010-09-09", | |
"Outputs": { | |
"StackArn": { | |
"Description": "Don't remove this output!", | |
"Value": { | |
"Ref": "AWS::StackId" | |
} | |
} | |
}, | |
"Resources": { | |
"RegisterModelLambdaFunction": { | |
"Type": "AWS::Lambda::Function", | |
"Properties": { | |
"Code": { | |
"ZipFile": "const AWS = require(\"aws-sdk\");\nconst dynamodb = new AWS.DynamoDB.DocumentClient({ apiVersion: '2012-08-10' });\n\nexports.handler = async(event, context, callback) => {\n console.log('Received event', JSON.stringify(event, null, 2));\n\n const registerModel = async(event) => {\n // get event parameters\n const modelName = event.ModelName;\n const sageMakerModelName = event.SageMakerModelName;\n const timestamp = DateUtil.formatDateTimeString(\n DateUtil.getDateFromUtcString(event.Timestamp));\n\n // build item\n var params = {\n TableName: \"MODEL_REGISTRY\",\n Item: {\n \"MODEL_NAME\": modelName,\n \"SAGEMAKER_MODEL_NAME\": sageMakerModelName,\n \"MODEL_TIMESTAMP\": timestamp\n }\n };\n\n // put item\n try {\n await dynamodb.put(params).promise();\n } catch (error) {\n return {\n statusCode: 400,\n error: `Could not post: ${error.stack}`\n };\n }\n\n // build return object\n const ret = {\n ModelName: modelName,\n SageMakerModelName: sageMakerModelName,\n Timestamp: timestamp\n };\n\n return ret;\n };\n\n return registerModel(event).then((result) => {\n callback(null, result);\n });\n};\n\nclass DateUtil {\n static getDateFromUtcString(dateString) {\n return new Date(dateString);\n }\n\n static formatDateString(date) {\n return date.toISOString().split('T')[0];\n }\n\n static formatDateTimeString(date) {\n return date.toISOString();\n }\n\n static formatDateStringShort(date) {\n const year = date.getUTCFullYear().toString();\n const month = date.getUTCMonth() + 1;\n const dayOfMonth = date.getUTCDate();\n const monthFull = month < 10 ? '0' + month : month;\n const dayOfMonthFull = dayOfMonth < 10 ? '0' + dayOfMonth : dayOfMonth;\n return year + monthFull + dayOfMonthFull;\n }\n\n static formatDateTimeStringShort(date) {\n const year = date.getUTCFullYear().toString();\n const month = date.getUTCMonth() + 1;\n const dayOfMonth = date.getUTCDate();\n const hours = date.getUTCHours();\n const minutes = date.getUTCMinutes();\n const seconds = date.getUTCSeconds();\n\n const monthFull = month < 10 ? '0' + month : month;\n const dayOfMonthFull = dayOfMonth < 10 ? '0' + dayOfMonth : dayOfMonth;\n const hoursFull = hours < 10 ? '0' + hours : hours;\n const minutesFull = minutes < 10 ? '0' + minutes : minutes;\n const secondsFull = seconds < 10 ? '0' + seconds : seconds;\n\n return year + monthFull + dayOfMonthFull + 'T' + hoursFull + minutesFull + secondsFull;\n }\n};\n" | |
}, | |
"Description": "Register a SageMaker Model", | |
"Tags": [ | |
{ | |
"Value": "SAM", | |
"Key": "lambda:createdBy" | |
} | |
], | |
"MemorySize": 128, | |
"Handler": "index.handler", | |
"Role": { | |
"Fn::GetAtt": [ | |
"RegisterModelLambdaFunctionRole", | |
"Arn" | |
] | |
}, | |
"Timeout": 60, | |
"Runtime": "nodejs8.10", | |
"FunctionName": "mdlc-training-register-model" | |
} | |
}, | |
"InitializeBatchInferenceWorkflowLambdaFunctionRole": { | |
"Type": "AWS::IAM::Role", | |
"Properties": { | |
"ManagedPolicyArns": [ | |
"arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole", | |
"arn:aws:iam::aws:policy/AmazonS3FullAccess" | |
], | |
"AssumeRolePolicyDocument": { | |
"Version": "2012-10-17", | |
"Statement": [ | |
{ | |
"Action": [ | |
"sts:AssumeRole" | |
], | |
"Effect": "Allow", | |
"Principal": { | |
"Service": [ | |
"lambda.amazonaws.com" | |
] | |
} | |
} | |
] | |
} | |
} | |
}, | |
"FindModelLambdaFunctionRole": { | |
"Type": "AWS::IAM::Role", | |
"Properties": { | |
"ManagedPolicyArns": [ | |
"arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole", | |
"arn:aws:iam::aws:policy/AmazonDynamoDBFullAccess", | |
"arn:aws:iam::aws:policy/AmazonS3FullAccess" | |
], | |
"AssumeRolePolicyDocument": { | |
"Version": "2012-10-17", | |
"Statement": [ | |
{ | |
"Action": [ | |
"sts:AssumeRole" | |
], | |
"Effect": "Allow", | |
"Principal": { | |
"Service": [ | |
"lambda.amazonaws.com" | |
] | |
} | |
} | |
] | |
} | |
} | |
}, | |
"SageMakerTrainingRole": { | |
"Type": "AWS::IAM::Role", | |
"Properties": { | |
"ManagedPolicyArns": [ | |
"arn:aws:iam::aws:policy/AmazonSageMakerFullAccess", | |
"arn:aws:iam::aws:policy/AmazonS3FullAccess" | |
], | |
"AssumeRolePolicyDocument": { | |
"Version": "2012-10-17", | |
"Statement": [ | |
{ | |
"Action": [ | |
"sts:AssumeRole" | |
], | |
"Effect": "Allow", | |
"Principal": { | |
"Service": [ | |
"sagemaker.amazonaws.com" | |
] | |
} | |
} | |
] | |
} | |
} | |
}, | |
"BatchInferenceWorkflowRole": { | |
"Type": "AWS::IAM::Role", | |
"Properties": { | |
"ManagedPolicyArns": [ | |
"arn:aws:iam::aws:policy/AWSLambdaFullAccess", | |
"arn:aws:iam::aws:policy/service-role/AWSLambdaRole", | |
"arn:aws:iam::aws:policy/AmazonSageMakerFullAccess", | |
"arn:aws:iam::aws:policy/AWSBatchFullAccess", | |
"arn:aws:iam::aws:policy/AmazonS3FullAccess", | |
"arn:aws:iam::aws:policy/AWSStepFunctionsFullAccess" | |
], | |
"AssumeRolePolicyDocument": { | |
"Version": "2012-10-17", | |
"Statement": [ | |
{ | |
"Action": [ | |
"sts:AssumeRole" | |
], | |
"Effect": "Allow", | |
"Principal": { | |
"Service": [ | |
"states.amazonaws.com" | |
] | |
} | |
} | |
] | |
} | |
} | |
}, | |
"InitializeTrainingWorkflowLambdaFunction": { | |
"Type": "AWS::Lambda::Function", | |
"Properties": { | |
"Code": { | |
"ZipFile": "const AWS = require(\"aws-sdk\");\nconst s3 = new AWS.S3();\n\nexports.handler = async(event, context, callback) => {\n console.log('Received event', JSON.stringify(event, null, 2));\n\n const initializeWorkflow = async(event) => {\n // get event parameters\n const modelName = event.WorkflowInput.ModelName;\n const s3RootBucket = event.S3RootBucket;\n const dataDate = DateUtil.getDateFromUtcString(event.WorkflowInput.DataDate);\n\n // generate training job name\n const runDate = new Date();\n const trainingJobName = `${modelName}-${DateUtil.formatDateTimeStringShort(runDate)}`;\n\n // generate training input folder\n const trainingInput = {\n S3Key: `input-data/${event.WorkflowInput.DataDate}/train`,\n S3Uri: `s3://${s3RootBucket}/input-data/${event.WorkflowInput.DataDate}/train`\n };\n\n // generate training validation folder\n const validationInput = {\n S3Key: `input-data/${event.WorkflowInput.DataDate}/validation`,\n S3Uri: `s3://${s3RootBucket}/input-data/${event.WorkflowInput.DataDate}/validation`\n };\n\n // generate training output folder\n const trainingOutput = {\n S3Key: `model/${event.WorkflowInput.DataDate}/${DateUtil.formatDateTimeStringShort(runDate)}`,\n S3Uri: `s3://${s3RootBucket}/model/${event.WorkflowInput.DataDate}/${DateUtil.formatDateTimeStringShort(runDate)}`\n }\n\n // generate training config\n const trainingConfig = {\n TrainingJobName: trainingJobName,\n TrainingInput: trainingInput,\n ValidationInput: validationInput,\n TrainingOutput: trainingOutput\n };\n\n // upload input request to S3\n const s3UploadParams = {\n Bucket: s3RootBucket,\n Key: 'workflow_request.json',\n Body: JSON.stringify(event, null, 2),\n ContentType: 'application/json'\n };\n await s3.putObject(s3UploadParams);\n\n // build return object\n const ret = {\n WorkflowRequest: event.WorkflowInput,\n S3RootBucket: s3RootBucket,\n DataDate: dataDate,\n RunDate: runDate,\n TrainingConfig: trainingConfig\n };\n\n return ret;\n };\n\n return initializeWorkflow(event).then((result) => {\n callback(null, result);\n });\n};\n\nclass DateUtil {\n static getDateFromUtcString(dateString) {\n return new Date(dateString);\n }\n\n static formatDateString(date) {\n return date.toISOString().split('T')[0];\n }\n\n static formatDateTimeString(date) {\n return date.toISOString();\n }\n\n static formatDateStringShort(date) {\n const year = date.getUTCFullYear().toString();\n const month = date.getUTCMonth() + 1;\n const dayOfMonth = date.getUTCDate();\n const monthFull = month < 10 ? '0' + month : month;\n const dayOfMonthFull = dayOfMonth < 10 ? '0' + dayOfMonth : dayOfMonth;\n return year + monthFull + dayOfMonthFull;\n }\n\n static formatDateTimeStringShort(date) {\n const year = date.getUTCFullYear().toString();\n const month = date.getUTCMonth() + 1;\n const dayOfMonth = date.getUTCDate();\n const hours = date.getUTCHours();\n const minutes = date.getUTCMinutes();\n const seconds = date.getUTCSeconds();\n\n const monthFull = month < 10 ? '0' + month : month;\n const dayOfMonthFull = dayOfMonth < 10 ? '0' + dayOfMonth : dayOfMonth;\n const hoursFull = hours < 10 ? '0' + hours : hours;\n const minutesFull = minutes < 10 ? '0' + minutes : minutes;\n const secondsFull = seconds < 10 ? '0' + seconds : seconds;\n\n return year + monthFull + dayOfMonthFull + 'T' + hoursFull + minutesFull + secondsFull;\n }\n};\n" | |
}, | |
"Description": "Initializes a Training ML workflow", | |
"Tags": [ | |
{ | |
"Value": "SAM", | |
"Key": "lambda:createdBy" | |
} | |
], | |
"MemorySize": 128, | |
"Handler": "index.handler", | |
"Role": { | |
"Fn::GetAtt": [ | |
"InitializeTrainingWorkflowLambdaFunctionRole", | |
"Arn" | |
] | |
}, | |
"Timeout": 60, | |
"Runtime": "nodejs8.10", | |
"FunctionName": "mdlc-training-initialize-workflow" | |
} | |
}, | |
"MDLCBatchInferenceWorkflow": { | |
"Type": "AWS::StepFunctions::StateMachine", | |
"Properties": { | |
"RoleArn": { | |
"Fn::GetAtt": [ | |
"BatchInferenceWorkflowRole", | |
"Arn" | |
] | |
}, | |
"DefinitionString": { | |
"Fn::Sub": "{\n \"StartAt\": \"Initialize\",\n \"States\": {\n \"Initialize\": {\n \"Type\": \"Task\",\n \"Resource\": \"${InitializeBatchInferenceWorkflowLambdaFunction.Arn}\",\n \"Parameters\": {\n \"WorkflowInput.$\": \"$\",\n \"S3RootBucket\": \"${MDLCS3Bucket}\"\n },\n \"ResultPath\": \"$\",\n \"Next\": \"Find Model\",\n \"Catch\": [\n {\n \"ErrorEquals\": [\"States.ALL\"],\n \"Next\": \"Handle Error\",\n \"ResultPath\": \"$.Error\"\n }\n ]\n },\n \"Find Model\": {\n \"Type\": \"Task\",\n \"Resource\": \"${FindModelLambdaFunction.Arn}\",\n \"Parameters\": {\n \"ModelName.$\": \"$.WorkflowRequest.ModelName\"\n },\n \"ResultPath\": \"$.FindModelOutput\",\n \"Next\": \"Inference\",\n \"Catch\": [\n {\n \"ErrorEquals\": [\"States.ALL\"],\n \"Next\": \"Handle Error\",\n \"ResultPath\": \"$.Error\"\n }\n ]\n },\n \"Inference\": {\n \"Type\": \"Task\",\n \"Resource\": \"arn:aws:states:::sagemaker:createTransformJob.sync\",\n \"Parameters\": {\n \"TransformJobName.$\": \"$.InferenceConfig.InferenceJobName\",\n \"ModelName.$\": \"$.FindModelOutput.SageMakerModelName\",\n \"TransformInput\": {\n \"DataSource\": {\n \"S3DataSource\": {\n \"S3DataType\": \"S3Prefix\",\n \"S3Uri.$\": \"$.InferenceConfig.InferenceInput.S3Uri\"\n }\n },\n \"ContentType\": \"text/csv\",\n \"SplitType\": \"Line\",\n \"CompressionType\": \"None\"\n },\n \"TransformOutput\": {\n \"S3OutputPath.$\": \"$.InferenceConfig.InferenceOutput.S3Uri\",\n \"Accept\": \"text/csv\",\n \"AssembleWith\": \"Line\"\n },\n \"TransformResources\": {\n \"InstanceType\": \"ml.c4.2xlarge\",\n \"InstanceCount\": 10\n }\n },\n \"ResultPath\": \"$.InferenceOutput\",\n \"Next\": \"Monitor Model Performance\",\n \"Catch\": [\n {\n \"ErrorEquals\": [\"States.ALL\"],\n \"Next\": \"Handle Error\",\n \"ResultPath\": \"$.Error\"\n }\n ]\n },\n \"Monitor Model Performance\": {\n \"Type\": \"Task\",\n \"Resource\": \"${MonitorModelPerformanceLambdaFunction.Arn}\",\n \"Parameters\": {\n \"PredictionInput.$\": \"$.MonitorConfig.MonitorInput.PredictionS3Uri\",\n \"TargetInput.$\": \"$.MonitorConfig.MonitorInput.TargetS3Uri\"\n },\n \"ResultPath\": \"$.MonitoringOutput\",\n \"Next\": \"Is Re-training Needed?\",\n \"Catch\": [\n {\n \"ErrorEquals\": [\"States.ALL\"],\n \"Next\": \"Handle Error\",\n \"ResultPath\": \"$.Error\"\n }\n ]\n },\n \"Is Re-training Needed?\": {\n \"Type\": \"Choice\",\n \"Choices\": [{\n \"Variable\": \"$.MonitoringOutput.ModelPerformance\",\n \"NumericLessThan\": 1.0,\n \"Next\": \"Re-train Model\"\n }, {\n \"Variable\": \"$.MonitoringOutput.ModelPerformance\",\n \"NumericGreaterThanEquals\": 1.0,\n \"Next\": \"Finalize\"\n }],\n \"Default\": \"Finalize\"\n },\n \"Re-train Model\": {\n \"Type\": \"Task\",\n \"Resource\": \"arn:aws:states:::states:startExecution.sync\",\n \"Parameters\": {\n \"Input\": {\n \"ModelName.$\": \"$.WorkflowRequest.ModelName\",\n \"DataDate.$\": \"$.WorkflowRequest.DataDate\"\n },\n \"StateMachineArn\": \"${MDLCTrainingWorkflow}\"\n },\n \"Next\": \"Finalize\",\n \"Catch\": [\n {\n \"ErrorEquals\": [\"States.ALL\"],\n \"Next\": \"Handle Error\",\n \"ResultPath\": \"$.Error\"\n }\n ]\n },\n \"Finalize\": {\n \"Type\": \"Pass\",\n \"End\": true\n },\n \"Handle Error\": {\n \"Type\": \"Pass\",\n \"Next\": \"Failure\"\n },\n \"Failure\": {\n \"Type\": \"Fail\"\n }\n }\n}" | |
}, | |
"StateMachineName": "mdlc-batch-inference-workflow" | |
} | |
}, | |
"TrainingWorkflowRole": { | |
"Type": "AWS::IAM::Role", | |
"Properties": { | |
"ManagedPolicyArns": [ | |
"arn:aws:iam::aws:policy/AWSLambdaFullAccess", | |
"arn:aws:iam::aws:policy/service-role/AWSLambdaRole", | |
"arn:aws:iam::aws:policy/AmazonSageMakerFullAccess", | |
"arn:aws:iam::aws:policy/AmazonS3FullAccess" | |
], | |
"AssumeRolePolicyDocument": { | |
"Version": "2012-10-17", | |
"Statement": [ | |
{ | |
"Action": [ | |
"sts:AssumeRole" | |
], | |
"Effect": "Allow", | |
"Principal": { | |
"Service": [ | |
"states.amazonaws.com" | |
] | |
} | |
} | |
] | |
} | |
} | |
}, | |
"MonitorModelPerformanceLambdaFunction": { | |
"Type": "AWS::Lambda::Function", | |
"Properties": { | |
"Layers": [ | |
{ | |
"Ref": "SklearnLayer" | |
}, | |
{ | |
"Ref": "PandasLayer" | |
} | |
], | |
"Code": { | |
"ZipFile": "import boto3\nimport os\n\nimport numpy as np\nimport pandas as pd\n\nfrom sklearn.metrics import roc_auc_score\n\n\ns3_client = boto3.client('s3')\n\ndef lambda_handler(event, context):\n\n ## load the dataset date and s3 input path\n # dataset_date = event['DatasetDate']\n # run_date = event['RunDate']\n preduction_path = event['PredictionInput']\n target_path = event['TargetInput']\n\n ## read the target and inference result\n target = pd.read_csv(target_path, header=None, names=['target'])\n prediction = pd.read_csv(preduction_path, header=None, names=['prediction'])\n\n ## compute the monitoring result\n monitoring_result = roc_auc_score(target.values, prediction.values)\n\n # print('By {}, for the dataset date {}, the AUC is {}'.format(run_date, dataset_date, monitoring_result))\n\n return {'ModelPerformance': monitoring_result}\n" | |
}, | |
"Description": "Monitor SageMaker model performance", | |
"Tags": [ | |
{ | |
"Value": "SAM", | |
"Key": "lambda:createdBy" | |
} | |
], | |
"MemorySize": 128, | |
"Handler": "index.lambda_handler", | |
"Role": { | |
"Fn::GetAtt": [ | |
"MonitorModelPerformanceLambdaFunctionRole", | |
"Arn" | |
] | |
}, | |
"Timeout": 900, | |
"Runtime": "python3.6", | |
"FunctionName": "mdlc-monitoring-model-performance" | |
} | |
}, | |
"SklearnLayer": { | |
"Type": "AWS::Lambda::LayerVersion", | |
"Properties": { | |
"Content": { | |
"S3Bucket": { | |
"Ref": "MDLCWorkshopPublicArtifactBucket" | |
}, | |
"S3Key": { | |
"Ref": "SklearnLambdaLayerArtifactKey" | |
} | |
}, | |
"LayerName": "sklearn-layer", | |
"Description": "numpy 1-14-3 scipy 1-1-0 skleran 0-20-3", | |
"CompatibleRuntimes": [ | |
"python3.6" | |
] | |
} | |
}, | |
"MDLCTrainingWorkflow": { | |
"Type": "AWS::StepFunctions::StateMachine", | |
"Properties": { | |
"RoleArn": { | |
"Fn::GetAtt": [ | |
"TrainingWorkflowRole", | |
"Arn" | |
] | |
}, | |
"DefinitionString": { | |
"Fn::Sub": "{\n \"StartAt\": \"Initialize\",\n \"States\": {\n \"Initialize\": {\n \"Type\": \"Task\",\n \"Resource\": \"${InitializeTrainingWorkflowLambdaFunction.Arn}\",\n \"Parameters\": {\n \"WorkflowInput.$\": \"$\",\n \"S3RootBucket\": \"${MDLCS3Bucket}\"\n },\n \"ResultPath\": \"$\",\n \"Next\": \"Train Model\",\n \"Catch\": [\n {\n \"ErrorEquals\": [\"States.ALL\"],\n \"Next\": \"Handle Error\",\n \"ResultPath\": \"$.Error\"\n }\n ]\n },\n \"Train Model\": {\n \"Type\": \"Task\",\n \"Resource\": \"arn:aws:states:::sagemaker:createTrainingJob.sync\",\n \"Parameters\": {\n \"TrainingJobName.$\": \"$.TrainingConfig.TrainingJobName\",\n \"AlgorithmSpecification\": {\n \"TrainingImage\": \"433757028032.dkr.ecr.us-west-2.amazonaws.com/xgboost:1\",\n \"TrainingInputMode\": \"File\"\n },\n \"RoleArn\": \"${SageMakerTrainingRole.Arn}\",\n \"HyperParameters\": {\n \"objective\": \"binary:logistic\",\n \"colsample_bytree\": \"0.7\",\n \"max_depth\": \"4\",\n \"eta\": \"0.2\",\n \"gamma\": \"4\",\n \"min_child_weight\": \"6\",\n \"subsample\": \"0.7\",\n \"learning_rate\": \"0.075\",\n \"silent\": \"0\",\n \"num_round\": \"200\",\n \"seed\": \"0\"\n },\n \"InputDataConfig\": [\n {\n \"ChannelName\": \"train\",\n \"DataSource\": {\n \"S3DataSource\": {\n \"S3DataType\": \"S3Prefix\",\n \"S3Uri.$\": \"$.TrainingConfig.TrainingInput.S3Uri\",\n \"S3DataDistributionType\": \"FullyReplicated\"\n }\n },\n \"CompressionType\": \"None\",\n \"RecordWrapperType\": \"None\",\n \"ContentType\": \"text/csv\"\n },\n {\n \"ChannelName\": \"validation\",\n \"DataSource\": {\n \"S3DataSource\": {\n \"S3DataType\": \"S3Prefix\",\n \"S3Uri.$\": \"$.TrainingConfig.ValidationInput.S3Uri\",\n \"S3DataDistributionType\": \"FullyReplicated\"\n }\n },\n \"CompressionType\": \"None\",\n \"RecordWrapperType\": \"None\",\n \"ContentType\": \"text/csv\"\n }\n ],\n \"OutputDataConfig\": {\n \"S3OutputPath.$\": \"$.TrainingConfig.TrainingOutput.S3Uri\"\n },\n \"ResourceConfig\": {\n \"InstanceCount\": 1,\n \"InstanceType\": \"ml.m5.4xlarge\",\n \"VolumeSizeInGB\": 50\n },\n \"StoppingCondition\": {\n \"MaxRuntimeInSeconds\": 3600\n }\n },\n \"ResultPath\": \"$.TrainingOutput\",\n \"Next\": \"Create Model\",\n \"Catch\": [\n {\n \"ErrorEquals\": [\"States.ALL\"],\n \"Next\": \"Handle Error\",\n \"ResultPath\": \"$.Error\"\n }\n ]\n },\n \"Create Model\": {\n \"Type\": \"Task\",\n \"Resource\": \"arn:aws:states:::sagemaker:createModel\",\n \"Parameters\": {\n \"ModelName.$\": \"$.TrainingConfig.TrainingJobName\",\n \"ExecutionRoleArn\": \"${SageMakerTrainingRole.Arn}\",\n \"PrimaryContainer\": {\n \"Image\": \"433757028032.dkr.ecr.us-west-2.amazonaws.com/xgboost:1\",\n \"ModelDataUrl.$\": \"$.TrainingOutput.ModelArtifacts.S3ModelArtifacts\"\n }\n },\n \"ResultPath\": \"$.CreateModelOutput\",\n \"Next\": \"Register Model\",\n \"Catch\": [\n {\n \"ErrorEquals\": [\"States.ALL\"],\n \"Next\": \"Handle Error\",\n \"ResultPath\": \"$.Error\"\n }\n ]\n },\n \"Register Model\": {\n \"Type\": \"Task\",\n \"Resource\": \"${RegisterModelLambdaFunction.Arn}\",\n \"Parameters\": {\n \"ModelName.$\": \"$.WorkflowRequest.ModelName\",\n \"SageMakerModelName.$\": \"$.TrainingConfig.TrainingJobName\",\n \"Timestamp.$\": \"$.RunDate\"\n },\n \"ResultPath\": \"$.RegisterModelOutput\",\n \"Next\": \"Finalize\",\n \"Catch\": [\n {\n \"ErrorEquals\": [\"States.ALL\"],\n \"Next\": \"Handle Error\",\n \"ResultPath\": \"$.Error\"\n }\n ]\n },\n \"Finalize\": {\n \"Type\": \"Pass\",\n \"End\": true\n },\n \"Handle Error\": {\n \"Type\": \"Pass\",\n \"Next\": \"Failure\"\n },\n \"Failure\": {\n \"Type\": \"Fail\"\n }\n }\n}" | |
}, | |
"StateMachineName": "mdlc-training-workflow" | |
} | |
}, | |
"InitializeBatchInferenceWorkflowLambdaFunction": { | |
"Type": "AWS::Lambda::Function", | |
"Properties": { | |
"Code": { | |
"ZipFile": "const AWS = require(\"aws-sdk\");\nconst s3 = new AWS.S3();\n\nexports.handler = async(event, context, callback) => {\n console.log('Received event', JSON.stringify(event, null, 2));\n\n const initializeWorkflow = async(event) => {\n // get event parameters\n const modelName = event.WorkflowInput.ModelName;\n const dataDate = DateUtil.getDateFromUtcString(event.WorkflowInput.DataDate);\n const s3RootBucket = event.S3RootBucket;\n\n // generate inference job name\n const runDate = new Date();\n const inferenceJobName = `${modelName}-${DateUtil.formatDateTimeStringShort(runDate)}`;\n\n // generate inference input folder\n const inferenceInput = {\n S3Key: `input-data/${event.WorkflowInput.DataDate}/holdout_data`,\n S3Uri: `s3://${s3RootBucket}/input-data/${event.WorkflowInput.DataDate}/holdout_data`\n };\n\n // generate inference output folder\n const inferenceOutput = {\n S3Key: `output-data/${event.WorkflowInput.DataDate}/${DateUtil.formatDateTimeStringShort(runDate)}`,\n S3Uri: `s3://${s3RootBucket}/output-data/${event.WorkflowInput.DataDate}/${DateUtil.formatDateTimeStringShort(runDate)}`\n };\n\n // generate inference config\n const inferenceConfig = {\n InferenceJobName: inferenceJobName,\n InferenceInput: inferenceInput,\n InferenceOutput: inferenceOutput\n };\n\n // generate holdout target input folder\n const monitorInput = {\n TargetS3Uri: `s3://${s3RootBucket}/input-data/${event.WorkflowInput.DataDate}/holdout_target/holdout_target.csv`,\n PredictionS3Uri: `s3://${s3RootBucket}/output-data/${event.WorkflowInput.DataDate}/${DateUtil.formatDateTimeStringShort(runDate)}/holdout_data.csv.out`\n }\n\n // generate monitor config\n const monitorConfig = {\n MonitorInput: monitorInput\n };\n\n // upload input request to S3\n const s3UploadParams = {\n Bucket: s3RootBucket,\n Key: 'workflow_request.json',\n Body: JSON.stringify(event, null, 2),\n ContentType: 'application/json'\n };\n await s3.putObject(s3UploadParams);\n\n // build return object\n const ret = {\n WorkflowRequest: event.WorkflowInput,\n S3RootBucket: s3RootBucket,\n DataDate: dataDate,\n RunDate: runDate,\n InferenceConfig: inferenceConfig,\n MonitorConfig: monitorConfig\n };\n\n return ret;\n };\n\n return initializeWorkflow(event).then((result) => {\n callback(null, result);\n });\n};\n\nclass DateUtil {\n static getDateFromUtcString(dateString) {\n return new Date(dateString);\n }\n\n static formatDateString(date) {\n return date.toISOString().split('T')[0];\n }\n\n static formatDateTimeString(date) {\n return date.toISOString();\n }\n\n static formatDateStringShort(date) {\n const year = date.getUTCFullYear().toString();\n const month = date.getUTCMonth() + 1;\n const dayOfMonth = date.getUTCDate();\n const monthFull = month < 10 ? '0' + month : month;\n const dayOfMonthFull = dayOfMonth < 10 ? '0' + dayOfMonth : dayOfMonth;\n return year + monthFull + dayOfMonthFull;\n }\n\n static formatDateTimeStringShort(date) {\n const year = date.getUTCFullYear().toString();\n const month = date.getUTCMonth() + 1;\n const dayOfMonth = date.getUTCDate();\n const hours = date.getUTCHours();\n const minutes = date.getUTCMinutes();\n const seconds = date.getUTCSeconds();\n\n const monthFull = month < 10 ? '0' + month : month;\n const dayOfMonthFull = dayOfMonth < 10 ? '0' + dayOfMonth : dayOfMonth;\n const hoursFull = hours < 10 ? '0' + hours : hours;\n const minutesFull = minutes < 10 ? '0' + minutes : minutes;\n const secondsFull = seconds < 10 ? '0' + seconds : seconds;\n\n return year + monthFull + dayOfMonthFull + 'T' + hoursFull + minutesFull + secondsFull;\n }\n};\n" | |
}, | |
"Description": "Initializes a Batch Inference ML workflow", | |
"Tags": [ | |
{ | |
"Value": "SAM", | |
"Key": "lambda:createdBy" | |
} | |
], | |
"MemorySize": 128, | |
"Handler": "index.handler", | |
"Role": { | |
"Fn::GetAtt": [ | |
"InitializeBatchInferenceWorkflowLambdaFunctionRole", | |
"Arn" | |
] | |
}, | |
"Timeout": 60, | |
"Runtime": "nodejs8.10", | |
"FunctionName": "mdlc-batch-inference-initialize-workflow" | |
} | |
}, | |
"PandasLayer": { | |
"Type": "AWS::Lambda::LayerVersion", | |
"Properties": { | |
"Content": { | |
"S3Bucket": { | |
"Ref": "MDLCWorkshopPublicArtifactBucket" | |
}, | |
"S3Key": { | |
"Ref": "PandasLambdaLayerArtifactKey" | |
} | |
}, | |
"LayerName": "pandas-layer", | |
"Description": "pandas 0-24-2 pyst 2018-4 s3fs 0-1-5", | |
"CompatibleRuntimes": [ | |
"python3.6" | |
] | |
} | |
}, | |
"CopyS3ResourceFunctionRole": { | |
"Type": "AWS::IAM::Role", | |
"Properties": { | |
"ManagedPolicyArns": [ | |
"arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole", | |
"arn:aws:iam::aws:policy/AmazonS3FullAccess" | |
], | |
"AssumeRolePolicyDocument": { | |
"Version": "2012-10-17", | |
"Statement": [ | |
{ | |
"Action": [ | |
"sts:AssumeRole" | |
], | |
"Effect": "Allow", | |
"Principal": { | |
"Service": [ | |
"lambda.amazonaws.com" | |
] | |
} | |
} | |
] | |
} | |
} | |
}, | |
"FindModelLambdaFunction": { | |
"Type": "AWS::Lambda::Function", | |
"Properties": { | |
"Code": { | |
"ZipFile": "const AWS = require(\"aws-sdk\");\nconst dynamodb = new AWS.DynamoDB.DocumentClient({ apiVersion: '2012-08-10' });\n\nexports.handler = async(event, context, callback) => {\n console.log('Received event', JSON.stringify(event, null, 2));\n\n const findModel = async(event) => {\n // get event parameters\n const modelName = event.ModelName;\n\n // build item\n var params = {\n TableName: \"MODEL_REGISTRY\",\n KeyConditionExpression: 'MODEL_NAME = :ModelName',\n ProjectionExpression: 'SAGEMAKER_MODEL_NAME',\n ExpressionAttributeValues: {\n ':ModelName': modelName\n },\n ConsistentRead: true,\n ScanIndexForward: false,\n Limit: 1\n };\n\n // put item\n let data;\n try {\n data = await dynamodb.query(params).promise();\n } catch (error) {\n return {\n statusCode: 400,\n error: `Could not post: ${error.stack}`\n };\n }\n\n if (data.Items.length == 0) {\n throw \"Unable to find model\";\n }\n\n // build return object\n const ret = {\n SageMakerModelName: data.Items[0]['SAGEMAKER_MODEL_NAME']\n };\n\n return ret;\n };\n\n return findModel(event).then((result) => {\n callback(null, result);\n });\n};\n\nclass DateUtil {\n static getDateFromUtcString(dateString) {\n return new Date(dateString);\n }\n\n static formatDateString(date) {\n return date.toISOString().split('T')[0];\n }\n\n static formatDateTimeString(date) {\n return date.toISOString();\n }\n\n static formatDateStringShort(date) {\n const year = date.getUTCFullYear().toString();\n const month = date.getUTCMonth() + 1;\n const dayOfMonth = date.getUTCDate();\n const monthFull = month < 10 ? '0' + month : month;\n const dayOfMonthFull = dayOfMonth < 10 ? '0' + dayOfMonth : dayOfMonth;\n return year + monthFull + dayOfMonthFull;\n }\n\n static formatDateTimeStringShort(date) {\n const year = date.getUTCFullYear().toString();\n const month = date.getUTCMonth() + 1;\n const dayOfMonth = date.getUTCDate();\n const hours = date.getUTCHours();\n const minutes = date.getUTCMinutes();\n const seconds = date.getUTCSeconds();\n\n const monthFull = month < 10 ? '0' + month : month;\n const dayOfMonthFull = dayOfMonth < 10 ? '0' + dayOfMonth : dayOfMonth;\n const hoursFull = hours < 10 ? '0' + hours : hours;\n const minutesFull = minutes < 10 ? '0' + minutes : minutes;\n const secondsFull = seconds < 10 ? '0' + seconds : seconds;\n\n return year + monthFull + dayOfMonthFull + 'T' + hoursFull + minutesFull + secondsFull;\n }\n};\n" | |
}, | |
"Description": "Find the latest SageMaker Model", | |
"Tags": [ | |
{ | |
"Value": "SAM", | |
"Key": "lambda:createdBy" | |
} | |
], | |
"MemorySize": 128, | |
"Handler": "index.handler", | |
"Role": { | |
"Fn::GetAtt": [ | |
"FindModelLambdaFunctionRole", | |
"Arn" | |
] | |
}, | |
"Timeout": 60, | |
"Runtime": "nodejs8.10", | |
"FunctionName": "mdlc-batch-inference-find-model" | |
} | |
}, | |
"MonitorModelPerformanceLambdaFunctionRole": { | |
"Type": "AWS::IAM::Role", | |
"Properties": { | |
"ManagedPolicyArns": [ | |
"arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole", | |
"arn:aws:iam::aws:policy/AmazonS3FullAccess" | |
], | |
"AssumeRolePolicyDocument": { | |
"Version": "2012-10-17", | |
"Statement": [ | |
{ | |
"Action": [ | |
"sts:AssumeRole" | |
], | |
"Effect": "Allow", | |
"Principal": { | |
"Service": [ | |
"lambda.amazonaws.com" | |
] | |
} | |
} | |
] | |
} | |
} | |
}, | |
"CopyS3ResourceFunction": { | |
"Type": "AWS::Lambda::Function", | |
"Properties": { | |
"Code": { | |
"ZipFile": "import base64\nimport boto3\nimport httplib\nimport json\nimport os\n\nfrom urllib2 import build_opener, HTTPHandler, Request\n\n\ns3_client = boto3.client(\"s3\")\n\ndef sendResponse(event, context, status, message):\n bucket = event[\"ResourceProperties\"].get(\"Target\", {}).get(\"Bucket\")\n key = event[\"ResourceProperties\"].get(\"Target\", {}).get(\"Key\")\n\n body = json.dumps({\n \"Status\": status,\n \"Reason\": message,\n \"StackId\": event['StackId'],\n \"RequestId\": event['RequestId'],\n \"LogicalResourceId\": event['LogicalResourceId'],\n \"PhysicalResourceId\": \"s3://{}/{}\".format(bucket, key),\n \"Data\": {\n \"Bucket\": bucket,\n \"Key\": key,\n },\n })\n\n request = Request(event['ResponseURL'], data=body)\n request.add_header('Content-Type', '')\n request.add_header('Content-Length', len(body))\n request.get_method = lambda: 'PUT'\n\n opener = build_opener(HTTPHandler)\n response = opener.open(request)\n\ndef handler(event, context):\n print(\"Received request:\", json.dumps(event, indent=4))\n\n request = event[\"RequestType\"]\n properties = event[\"ResourceProperties\"]\n print properties\n\n if \"Target\" not in properties or all(prop not in properties for prop in [\"Body\", \"Base64Body\", \"Source\"]):\n return sendResponse(event, context, \"FAILED\", \"Missing required parameters\")\n\n target = properties[\"Target\"]\n\n sink_bucket = target[\"Bucket\"]\n sink_prefix = target[\"Key\"]\n\n if request in (\"Create\", \"Update\"):\n if \"Body\" in properties:\n target.update({\n \"Body\": properties[\"Body\"],\n })\n\n s3_client.put_object(**target)\n\n elif \"Base64Body\" in properties:\n try:\n body = base64.b64decode(properties[\"Base64Body\"])\n except:\n return sendResponse(event, context, \"FAILED\", \"Malformed Base64Body\")\n\n target.update({\n \"Body\": body\n })\n\n s3_client.put_object(**target)\n\n elif \"Source\" in properties:\n source = properties[\"Source\"]\n source_bucket = source[\"Bucket\"]\n source_prefix = source[\"Key\"]\n\n paginator = s3_client.get_paginator(\"list_objects_v2\")\n page_iterator = paginator.paginate(Bucket=source_bucket, Prefix=source_prefix)\n\n for source_key in {x['Key'] for page in page_iterator for x in page[\"Contents\"]}:\n sink_key = os.path.join(sink_prefix, os.path.relpath(source_key, source_prefix))\n print \"copy {} to {}\".format(source_key, sink_key)\n\n if not source_key.endswith(\"/\"):\n print \"copy {} to {}\".format(source_key, sink_key)\n s3_client.copy_object(\n CopySource={\"Bucket\": source_bucket, \"Key\": source_key},\n Bucket=sink_bucket,\n Key = sink_key,\n MetadataDirective=\"COPY\",\n TaggingDirective=\"COPY\"\n )\n\n else:\n return sendResponse(event, context, \"FAILED\", \"Malformed body\")\n\n return sendResponse(event, context, \"SUCCESS\", \"Created\")\n\n if request == \"Delete\":\n\n paginator = s3_client.get_paginator(\"list_objects_v2\")\n page_iterator = paginator.paginate(Bucket=sink_bucket, Prefix=sink_prefix)\n sink_objects = [{'Key': x['Key']} for page in page_iterator for x in page['Contents']]\n s3_client.delete_objects(\n Bucket=sink_bucket,\n Delete={'Objects': sink_objects}\n )\n\n return sendResponse(event, context, \"SUCCESS\", \"Deleted\")\n\n return sendResponse(event, context, \"FAILED\", \"Unexpected: {}\".format(request))\n" | |
}, | |
"Description": "Copy S3 Input Object", | |
"Tags": [ | |
{ | |
"Value": "SAM", | |
"Key": "lambda:createdBy" | |
} | |
], | |
"MemorySize": 128, | |
"Handler": "index.handler", | |
"Role": { | |
"Fn::GetAtt": [ | |
"CopyS3ResourceFunctionRole", | |
"Arn" | |
] | |
}, | |
"Timeout": 900, | |
"Runtime": "python2.7", | |
"FunctionName": "mdlc-copy-object" | |
} | |
}, | |
"RegisterModelLambdaFunctionRole": { | |
"Type": "AWS::IAM::Role", | |
"Properties": { | |
"ManagedPolicyArns": [ | |
"arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole", | |
"arn:aws:iam::aws:policy/AmazonDynamoDBFullAccess", | |
"arn:aws:iam::aws:policy/AmazonS3FullAccess" | |
], | |
"AssumeRolePolicyDocument": { | |
"Version": "2012-10-17", | |
"Statement": [ | |
{ | |
"Action": [ | |
"sts:AssumeRole" | |
], | |
"Effect": "Allow", | |
"Principal": { | |
"Service": [ | |
"lambda.amazonaws.com" | |
] | |
} | |
} | |
] | |
} | |
} | |
}, | |
"InputDataObject": { | |
"Type": "Custom::S3Object", | |
"Properties": { | |
"Source": { | |
"Bucket": { | |
"Ref": "MDLCWorkshopPublicArtifactBucket" | |
}, | |
"Key": "input-data" | |
}, | |
"ServiceToken": { | |
"Fn::GetAtt": "CopyS3ResourceFunction.Arn" | |
}, | |
"Target": { | |
"Bucket": { | |
"Ref": "MDLCS3Bucket" | |
}, | |
"Key": "input-data" | |
} | |
} | |
}, | |
"MDLCS3Bucket": { | |
"Type": "AWS::S3::Bucket" | |
}, | |
"InitializeTrainingWorkflowLambdaFunctionRole": { | |
"Type": "AWS::IAM::Role", | |
"Properties": { | |
"ManagedPolicyArns": [ | |
"arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole", | |
"arn:aws:iam::aws:policy/AmazonS3FullAccess" | |
], | |
"AssumeRolePolicyDocument": { | |
"Version": "2012-10-17", | |
"Statement": [ | |
{ | |
"Action": [ | |
"sts:AssumeRole" | |
], | |
"Effect": "Allow", | |
"Principal": { | |
"Service": [ | |
"lambda.amazonaws.com" | |
] | |
} | |
} | |
] | |
} | |
} | |
}, | |
"ModelTable": { | |
"Type": "AWS::DynamoDB::Table", | |
"Properties": { | |
"KeySchema": [ | |
{ | |
"KeyType": "HASH", | |
"AttributeName": "MODEL_NAME" | |
}, | |
{ | |
"KeyType": "RANGE", | |
"AttributeName": "MODEL_TIMESTAMP" | |
} | |
], | |
"TableName": "MODEL_REGISTRY", | |
"AttributeDefinitions": [ | |
{ | |
"AttributeName": "MODEL_NAME", | |
"AttributeType": "S" | |
}, | |
{ | |
"AttributeName": "MODEL_TIMESTAMP", | |
"AttributeType": "S" | |
} | |
], | |
"BillingMode": "PAY_PER_REQUEST" | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment