Skip to content

Instantly share code, notes, and snippets.

@gavinmh
Created August 14, 2020 16:31
Show Gist options
  • Save gavinmh/267bc34ddedaf0931151a901859e165d to your computer and use it in GitHub Desktop.
Save gavinmh/267bc34ddedaf0931151a901859e165d to your computer and use it in GitHub Desktop.
example.ipynb
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Amazon SageMaker Multi-Model Endpoints using Scikit Learn\n",
"With [Amazon SageMaker multi-model endpoints](https://docs.aws.amazon.com/sagemaker/latest/dg/multi-model-endpoints.html), customers can create an endpoint that seamlessly hosts up to thousands of models. These endpoints are well suited to use cases where any one of a large number of models, which can be served from a common inference container to save inference costs, needs to be invokable on-demand and where it is acceptable for infrequently invoked models to incur some additional latency. For applications which require consistently low inference latency, an endpoint deploying a single model is still the best choice.\n",
"\n",
"At a high level, Amazon SageMaker manages the loading and unloading of models for a multi-model endpoint, as they are needed. When an invocation request is made for a particular model, Amazon SageMaker routes the request to an instance assigned to that model, downloads the model artifacts from S3 onto that instance, and initiates loading of the model into the memory of the container. As soon as the loading is complete, Amazon SageMaker performs the requested invocation and returns the result. If the model is already loaded in memory on the selected instance, the downloading and loading steps are skipped and the invocation is performed immediately.\n",
"\n",
"To demonstrate how multi-model endpoints are created and used, this notebook provides an example using a set of Scikit Learn models that each predict housing prices for a single location. This domain is used as a simple example to easily experiment with multi-model endpoints.\n",
"\n",
"The Amazon SageMaker multi-model endpoint capability is designed to work across with Mxnet, PyTorch and Scikit-Learn machine learning frameworks (TensorFlow coming soon), SageMaker XGBoost, KNN, and Linear Learner algorithms.\n",
"\n",
"In addition, Amazon SageMaker multi-model endpoints are also designed to work with cases where you bring your own container that integrates with the multi-model server library. An example of this can be found [here](https://github.com/awslabs/amazon-sagemaker-examples/tree/master/advanced_functionality/multi_model_bring_your_own) and documentation [here.](https://docs.aws.amazon.com/sagemaker/latest/dg/build-multi-model-build-container.html)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Contents\n",
"\n",
"1. [Generate synthetic data for housing models](#Generate-synthetic-data-for-housing-models)\n",
"1. [Train multiple house value prediction models](#Train-multiple-house-value-prediction-models)\n",
"1. [Create the Amazon SageMaker MultiDataModel entity](#Create-the-Amazon-SageMaker-MultiDataModel-entity)\n",
" 1. [Create the Multi-Model Endpoint](#Create-the-multi-model-endpoint)\n",
" 1. [Deploy the Multi-Model Endpoint](#deploy-the-multi-model-endpoint)\n",
"1. [Get Predictions from the endpoint](#Get-predictions-from-the-endpoint)\n",
"1. [Additional Information](#Additional-information)\n",
"1. [Clean up](#Clean-up)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Generate synthetic data for housing models\n",
"\n",
"The code below contains helper functions to generate synthetic data in the form of a `1x7` numpy array representing the features of a house.\n",
"\n",
"The first entry in the array is the randomly generated price of a house. The remaining entries are the features (i.e. number of bedroom, square feet, number of bathrooms, etc.).\n",
"\n",
"These functions will be used to generate synthetic data for training, validation, and testing. It will also allow us to submit synthetic payloads for inference to test our multi-model endpoint."
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [],
"source": [
"!pip install -qU awscli boto3 sagemaker pip"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [],
"source": [
"import numpy as np\n",
"import pandas as pd\n",
"import time"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [],
"source": [
"NUM_HOUSES_PER_LOCATION = 1000\n",
"LOCATIONS = ['NewYork_NY', 'LosAngeles_CA', 'Chicago_IL', 'Houston_TX', 'Dallas_TX',\n",
" 'Phoenix_AZ', 'Philadelphia_PA', 'SanAntonio_TX', 'SanDiego_CA', 'SanFrancisco_CA']\n",
"PARALLEL_TRAINING_JOBS = 4 # len(LOCATIONS) if your account limits can handle it\n",
"MAX_YEAR = 2019"
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {},
"outputs": [],
"source": [
"def gen_price(house):\n",
" _base_price = int(house['SQUARE_FEET'] * 150)\n",
" _price = int(_base_price + (10000 * house['NUM_BEDROOMS']) + \\\n",
" (15000 * house['NUM_BATHROOMS']) + \\\n",
" (15000 * house['LOT_ACRES']) + \\\n",
" (15000 * house['GARAGE_SPACES']) - \\\n",
" (5000 * (MAX_YEAR - house['YEAR_BUILT'])))\n",
" return _price"
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {},
"outputs": [],
"source": [
"def gen_random_house():\n",
" _house = {'SQUARE_FEET': int(np.random.normal(3000, 750)),\n",
" 'NUM_BEDROOMS': np.random.randint(2, 7),\n",
" 'NUM_BATHROOMS': np.random.randint(2, 7) / 2,\n",
" 'LOT_ACRES': round(np.random.normal(1.0, 0.25), 2),\n",
" 'GARAGE_SPACES': np.random.randint(0, 4),\n",
" 'YEAR_BUILT': min(MAX_YEAR, int(np.random.normal(1995, 10)))}\n",
" _price = gen_price(_house)\n",
" return [_price, _house['YEAR_BUILT'], _house['SQUARE_FEET'], \n",
" _house['NUM_BEDROOMS'], _house['NUM_BATHROOMS'], \n",
" _house['LOT_ACRES'], _house['GARAGE_SPACES']]"
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {},
"outputs": [],
"source": [
"def gen_houses(num_houses):\n",
" _house_list = []\n",
" for i in range(num_houses):\n",
" _house_list.append(gen_random_house())\n",
" _df = pd.DataFrame(_house_list, \n",
" columns=['PRICE', 'YEAR_BUILT',\n",
" 'SQUARE_FEET', 'NUM_BEDROOMS',\n",
" 'NUM_BATHROOMS', 'LOT_ACRES',\n",
" 'GARAGE_SPACES'])\n",
" return _df"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Train multiple house value prediction models\n",
"\n",
"In the follow section, we are setting up the code to train a house price prediction model for each of 4 different cities.\n",
"\n",
"As such, we will launch multiple training jobs asynchronously, using the AWS Managed container for Scikit Learn via the Sagemaker SDK using the `SKLearn` estimator class.\n",
"\n",
"In this notebook, we will be using the AWS Managed Scikit Learn image for both training and inference - this image provides native support for launching multi-model endpoints."
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {},
"outputs": [],
"source": [
"import sagemaker\n",
"from sagemaker import get_execution_role\n",
"import boto3\n",
"\n",
"s3 = boto3.resource('s3')\n",
"\n",
"sagemaker_session = sagemaker.Session()\n",
"role = get_execution_role()\n",
"\n",
"BUCKET = sagemaker_session.default_bucket()\n",
"TRAINING_FILE = 'training.py'\n",
"INFERENCE_FILE = 'inference.py'\n",
"SOURCE_DIR = 'source_dir'\n",
"\n",
"DATA_PREFIX = 'DEMO_MME_SCIKIT_V3'\n",
"MULTI_MODEL_ARTIFACTS = 'multi_model_artifacts'\n",
"\n",
"TRAIN_INSTANCE_TYPE = 'ml.m4.xlarge'\n",
"ENDPOINT_INSTANCE_TYPE = 'ml.m4.xlarge'\n",
"\n",
"ENDPOINT_NAME = 'mme-sklearn-housing-V3'\n",
"\n",
"MODEL_NAME = ENDPOINT_NAME"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Split a given dataset into train, validation, and test\n",
"\n",
"The code below will generate 3 sets of data. 1 set to train, 1 set for validation and 1 for testing."
]
},
{
"cell_type": "code",
"execution_count": 8,
"metadata": {},
"outputs": [],
"source": [
"from sklearn.model_selection import train_test_split\n",
"SEED = 7\n",
"SPLIT_RATIOS = [0.6, 0.3, 0.1]\n",
"\n",
"def split_data(df):\n",
" # split data into train and test sets\n",
" seed = SEED\n",
" val_size = SPLIT_RATIOS[1]\n",
" test_size = SPLIT_RATIOS[2]\n",
" \n",
" num_samples = df.shape[0]\n",
" X1 = df.values[:num_samples, 1:] # keep only the features, skip the target, all rows\n",
" Y1 = df.values[:num_samples, :1] # keep only the target, all rows\n",
"\n",
" # Use split ratios to divide up into train/val/test\n",
" X_train, X_val, y_train, y_val = \\\n",
" train_test_split(X1, Y1, test_size=(test_size + val_size), random_state=seed)\n",
" # Of the remaining non-training samples, give proper ratio to validation and to test\n",
" X_test, X_test, y_test, y_test = \\\n",
" train_test_split(X_val, y_val, test_size=(test_size / (test_size + val_size)), \n",
" random_state=seed)\n",
" # reassemble the datasets with target in first column and features after that\n",
" _train = np.concatenate([y_train, X_train], axis=1)\n",
" _val = np.concatenate([y_val, X_val], axis=1)\n",
" _test = np.concatenate([y_test, X_test], axis=1)\n",
"\n",
" return _train, _val, _test"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Prepare training and inference scripts\n",
"\n",
"By using the Scikit Learn estimator via the Sagemaker SDK, we can host and train models on Amazon Sagemaker.\n",
"\n",
"For training, we do the following:\n",
"\n",
"1. Prepare a training script - this script will execute the training logic within a SageMaker managed Scikit Learn container.\n",
"\n",
"\n",
"2. Create a `sagemaker.sklearn.estimator.SKLearn` estimator\n",
"\n",
"\n",
"3. Call the estimators `.fit()` method.\n",
"\n",
"For more information on using scikit learn with the Sagemaker SDK, see the docs [here.](https://sagemaker.readthedocs.io/en/stable/frameworks/sklearn/using_sklearn.html)\n",
"\n",
"Below, we will create the training script called `training.py` that will be located at the root of a dicrectory called `source_dir`.\n",
"\n",
"In this example, we will be training a [RandomForestRegressor](https://scikit-learn.org/stable/modules/generated/sklearn.ensemble.RandomForestRegressor.html) model that will later be used for inference in predicting house prices.\n",
"\n",
"**NOTE:** You would modify the script below to implement your own training logic."
]
},
{
"cell_type": "code",
"execution_count": 9,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"mkdir: cannot create directory ‘source_dir’: File exists\r\n"
]
}
],
"source": [
"!mkdir $SOURCE_DIR"
]
},
{
"cell_type": "code",
"execution_count": 10,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Overwriting source_dir/training.py\n"
]
}
],
"source": [
"%%writefile $SOURCE_DIR/$TRAINING_FILE\n",
"\n",
"import argparse\n",
"import os\n",
"\n",
"import numpy as np\n",
"import pandas as pd\n",
"from sklearn.ensemble import RandomForestRegressor\n",
"import joblib\n",
"\n",
"if __name__ =='__main__':\n",
"\n",
" print('extracting arguments')\n",
" parser = argparse.ArgumentParser()\n",
"\n",
" # hyperparameters sent by the client are passed as command-line arguments to the script.\n",
" # to simplify the demo we don't use all sklearn RandomForest hyperparameters\n",
" parser.add_argument('--n-estimators', type=int, default=10)\n",
" parser.add_argument('--min-samples-leaf', type=int, default=3)\n",
"\n",
" # Data, model, and output directories\n",
" parser.add_argument('--model-dir', type=str, default=os.environ.get('SM_MODEL_DIR'))\n",
" parser.add_argument('--train', type=str, default=os.environ.get('SM_CHANNEL_TRAIN'))\n",
" parser.add_argument('--validation', type=str, default=os.environ.get('SM_CHANNEL_VALIDATION'))\n",
" parser.add_argument('--model-name', type=str)\n",
"\n",
" args, _ = parser.parse_known_args()\n",
"\n",
" print('reading data')\n",
" print('model_name: {}'.format(args.model_name))\n",
"\n",
" train_file = os.path.join(args.train, args.model_name + '_train.csv') \n",
" train_df = pd.read_csv(train_file) # read in the training data\n",
"\n",
" val_file = os.path.join(args.validation, args.model_name + '_val.csv')\n",
" test_df = pd.read_csv(os.path.join(val_file)) # read in the test data\n",
"\n",
" # Matrix representation of the data\n",
" print('building training and testing datasets')\n",
" X_train = train_df[train_df.columns[1:train_df.shape[1]]] \n",
" X_test = test_df[test_df.columns[1:test_df.shape[1]]]\n",
" y_train = train_df[train_df.columns[0]]\n",
" y_test = test_df[test_df.columns[0]]\n",
"\n",
" # fitting the model\n",
" print('training model')\n",
" model = RandomForestRegressor(\n",
" n_estimators=args.n_estimators,\n",
" min_samples_leaf=args.min_samples_leaf,\n",
" n_jobs=-1)\n",
" \n",
" model.fit(X_train, y_train)\n",
"\n",
" # print abs error\n",
" print('validating model')\n",
" abs_err = np.abs(model.predict(X_test) - y_test)\n",
"\n",
" # print couple perf metrics\n",
" for q in [10, 50, 90]:\n",
" print('AE-at-' + str(q) + 'th-percentile: '\n",
" + str(np.percentile(a=abs_err, q=q)))\n",
" \n",
" # persist model\n",
" path = os.path.join(args.model_dir, 'model.joblib')\n",
" joblib.dump(model, path)\n",
" print('model persisted at ' + path)"
]
},
{
"cell_type": "code",
"execution_count": 11,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Overwriting source_dir/requirements.txt\n"
]
}
],
"source": [
"%%writefile $SOURCE_DIR/requirements.txt\n",
"\n",
"shap"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"When using multi-model endpoints with the Sagemaker managed Scikit Learn container, we need to provide an entry point script for\n",
"inference that will **at least** load the saved model.\n",
"\n",
"We will now create this script and call it `inference.py` and store it at the root of a directory called `source_dir`. This is the same directory which contains our `training.py` script.\n",
"\n",
"**Note:** You could place the below `model_fn` function within the `training.py` script (above the main guard) if you prefer to have a single script.\n",
"\n",
"**Note:** You would modify the script below to implement your own inferencing logic.\n",
"\n",
"Additional information on model loading and model serving for Scikit Learn on SageMaker can be found [here.](https://sagemaker.readthedocs.io/en/stable/frameworks/sklearn/using_sklearn.html#deploy-a-scikit-learn-model)"
]
},
{
"cell_type": "code",
"execution_count": 12,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Overwriting source_dir/inference.py\n"
]
}
],
"source": [
"%%writefile $SOURCE_DIR/$INFERENCE_FILE\n",
"\n",
"import os\n",
"import joblib\n",
"import shap\n",
"\n",
"\n",
"def model_fn(model_dir):\n",
" print('loading model.joblib from: {}'.format(model_dir))\n",
" loaded_model = joblib.load(os.path.join(model_dir, 'model.joblib'))\n",
" return loaded_model"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Launch a single training job for a given housing location\n",
"There is nothing specific to multi-model endpoints in terms of the models it will host. They are trained in the same way as all other SageMaker models. Here we are using the Scikit Learn estimator and not waiting for the job to complete."
]
},
{
"cell_type": "code",
"execution_count": 13,
"metadata": {},
"outputs": [],
"source": [
"from sagemaker.sklearn.estimator import SKLearn\n",
"from sagemaker.inputs import TrainingInput\n",
"\n",
"\n",
"def launch_training_job(location):\n",
" # clear out old versions of the data\n",
" s3_bucket = s3.Bucket(BUCKET)\n",
" full_input_prefix = f'{DATA_PREFIX}/model_prep/{location}'\n",
" s3_bucket.objects.filter(Prefix=full_input_prefix + '/').delete()\n",
"\n",
" # upload the entire set of data for all three channels\n",
" local_folder = f'data/{location}'\n",
" inputs = sagemaker_session.upload_data(path=local_folder, \n",
" key_prefix=full_input_prefix)\n",
" \n",
" print(f'Training data uploaded: {inputs}')\n",
" \n",
" _job = 'skl-{}'.format(location.replace('_', '-'))\n",
" full_output_prefix = f'{DATA_PREFIX}/model_artifacts/{location}'\n",
" s3_output_path = f's3://{BUCKET}/{full_output_prefix}'\n",
" \n",
" code_location = f's3://{BUCKET}/{full_input_prefix}/code'\n",
" \n",
"\n",
" # Add code_location argument in order to ensure that code_artifacts are stored in the same place.\n",
" estimator = SKLearn(\n",
" entry_point=TRAINING_FILE, # script to use for training job\n",
" role=role,\n",
" source_dir=SOURCE_DIR, # Location of scripts\n",
" instance_count=1,\n",
" instance_type=TRAIN_INSTANCE_TYPE,\n",
" framework_version='0.23-1',# 0.23-1 is the latest version\n",
" output_path=s3_output_path,# Where to store model artifacts\n",
" base_job_name=_job,\n",
" code_location=code_location,# This is where the .tar.gz of the source_dir will be stored\n",
" metric_definitions=[\n",
" {'Name' : 'median-AE',\n",
" 'Regex': 'AE-at-50th-percentile: ([0-9.]+).*$'}],\n",
" hyperparameters = {'n-estimators' : 100,\n",
" 'min-samples-leaf': 3,\n",
" 'model-name' : location})\n",
" \n",
" DISTRIBUTION_MODE = 'FullyReplicated'\n",
" \n",
" train_input = TrainingInput(s3_data=inputs+'/train', \n",
" distribution=DISTRIBUTION_MODE, content_type='csv')\n",
" \n",
" val_input = TrainingInput(s3_data=inputs+'/val', \n",
" distribution=DISTRIBUTION_MODE, content_type='csv')\n",
" \n",
" remote_inputs = {'train': train_input, 'validation': val_input}\n",
"\n",
" estimator.fit(remote_inputs, wait=False)\n",
" \n",
" # Return the estimator object\n",
" return estimator\n",
" "
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Kick off a model training job for each housing location"
]
},
{
"cell_type": "code",
"execution_count": 14,
"metadata": {},
"outputs": [],
"source": [
"def save_data_locally(location, train, val, test):\n",
"# _header = ','.join(COLUMNS)\n",
" \n",
" os.makedirs(f'data/{location}/train')\n",
" np.savetxt(f'data/{location}/train/{location}_train.csv', train, delimiter=',', fmt='%.2f')\n",
" \n",
" os.makedirs(f'data/{location}/val')\n",
" np.savetxt(f'data/{location}/val/{location}_val.csv', val, delimiter=',', fmt='%.2f')\n",
" \n",
" os.makedirs(f'data/{location}/test')\n",
" np.savetxt(f'data/{location}/test/{location}_test.csv', test, delimiter=',', fmt='%.2f')\n",
" "
]
},
{
"cell_type": "code",
"execution_count": 15,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Training data uploaded: s3://sagemaker-us-east-1-511226949265/DEMO_MME_SCIKIT_V3/model_prep/NewYork_NY\n",
"Training data uploaded: s3://sagemaker-us-east-1-511226949265/DEMO_MME_SCIKIT_V3/model_prep/LosAngeles_CA\n",
"Training data uploaded: s3://sagemaker-us-east-1-511226949265/DEMO_MME_SCIKIT_V3/model_prep/Chicago_IL\n",
"Training data uploaded: s3://sagemaker-us-east-1-511226949265/DEMO_MME_SCIKIT_V3/model_prep/Houston_TX\n",
"\n",
"4 training jobs launched: ['skl-NewYork-NY-2020-08-14-15-47-25-807', 'skl-LosAngeles-CA-2020-08-14-15-47-28-468', 'skl-Chicago-IL-2020-08-14-15-47-30-974', 'skl-Houston-TX-2020-08-14-15-47-33-535']\n"
]
}
],
"source": [
"import shutil\n",
"import os\n",
"\n",
"estimators = []\n",
"\n",
"shutil.rmtree('data', ignore_errors=True)\n",
"\n",
"for loc in LOCATIONS[:PARALLEL_TRAINING_JOBS]:\n",
" _houses = gen_houses(NUM_HOUSES_PER_LOCATION)\n",
" _train, _val, _test = split_data(_houses)\n",
" save_data_locally(loc, _train, _val, _test)\n",
" estimator = launch_training_job(loc)\n",
" estimators.append(estimator)\n",
" time.sleep(2) # to avoid throttling the CreateTrainingJob API\n",
"\n",
"print()\n",
"print(f'{len(estimators)} training jobs launched: {[x.latest_training_job.job_name for x in estimators]}')\n",
"\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Wait for all model training to finish"
]
},
{
"cell_type": "code",
"execution_count": 16,
"metadata": {},
"outputs": [],
"source": [
"def wait_for_training_job_to_complete(estimator):\n",
" job = estimator.latest_training_job.job_name\n",
" print(f'Waiting for job: {job}')\n",
" status = estimator.latest_training_job.describe()['TrainingJobStatus']\n",
" while status == 'InProgress':\n",
" time.sleep(45)\n",
" status = estimator.latest_training_job.describe()['TrainingJobStatus']\n",
" if status == 'InProgress':\n",
" print(f'{job} job status: {status}')\n",
" print(f'DONE. Status for {job} is {status}\\n')\n",
" "
]
},
{
"cell_type": "code",
"execution_count": 17,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Waiting for job: skl-NewYork-NY-2020-08-14-15-47-25-807\n",
"skl-NewYork-NY-2020-08-14-15-47-25-807 job status: InProgress\n",
"skl-NewYork-NY-2020-08-14-15-47-25-807 job status: InProgress\n",
"skl-NewYork-NY-2020-08-14-15-47-25-807 job status: InProgress\n",
"skl-NewYork-NY-2020-08-14-15-47-25-807 job status: InProgress\n",
"skl-NewYork-NY-2020-08-14-15-47-25-807 job status: InProgress\n",
"skl-NewYork-NY-2020-08-14-15-47-25-807 job status: InProgress\n",
"DONE. Status for skl-NewYork-NY-2020-08-14-15-47-25-807 is Completed\n",
"\n",
"Waiting for job: skl-LosAngeles-CA-2020-08-14-15-47-28-468\n",
"DONE. Status for skl-LosAngeles-CA-2020-08-14-15-47-28-468 is Completed\n",
"\n",
"Waiting for job: skl-Chicago-IL-2020-08-14-15-47-30-974\n",
"DONE. Status for skl-Chicago-IL-2020-08-14-15-47-30-974 is Completed\n",
"\n",
"Waiting for job: skl-Houston-TX-2020-08-14-15-47-33-535\n",
"DONE. Status for skl-Houston-TX-2020-08-14-15-47-33-535 is Completed\n",
"\n"
]
}
],
"source": [
"# wait for the jobs to finish\n",
"for est in estimators:\n",
" wait_for_training_job_to_complete(est)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Create the multi-model endpoint with the SageMaker SDK"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Create a SageMaker Model from one of the Estimators"
]
},
{
"cell_type": "code",
"execution_count": 18,
"metadata": {},
"outputs": [],
"source": [
"estimator = estimators[0]\n",
"# inference.py is the entry_point for when we deploy the model\n",
"# Note how we do NOT specify source_dir again, this information is inherited from the estimator\n",
"model = estimator.create_model(role=role, entry_point='inference.py')\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Create the Amazon SageMaker MultiDataModel entity\n",
"\n",
"We create the multi-model endpoint using the [```MultiDataModel```](https://sagemaker.readthedocs.io/en/stable/api/inference/multi_data_model.html) class.\n",
"\n",
"You can create a MultiDataModel by directly passing in a `sagemaker.model.Model` object - in which case, the Endpoint will inherit information about the image to use, as well as any environmental variables, network isolation, etc., once the MultiDataModel is deployed.\n",
"\n",
"In addition, a MultiDataModel can also be created without explictly passing a `sagemaker.model.Model` object. Please refer to the documentation for additional details."
]
},
{
"cell_type": "code",
"execution_count": 19,
"metadata": {},
"outputs": [],
"source": [
"from sagemaker.multidatamodel import MultiDataModel"
]
},
{
"cell_type": "code",
"execution_count": 20,
"metadata": {},
"outputs": [],
"source": [
"# This is where our MME will read models from on S3.\n",
"model_data_prefix = f's3://{BUCKET}/{DATA_PREFIX}/{MULTI_MODEL_ARTIFACTS}/'"
]
},
{
"cell_type": "code",
"execution_count": 21,
"metadata": {},
"outputs": [],
"source": [
"mme = MultiDataModel(name=MODEL_NAME,\n",
" model_data_prefix=model_data_prefix,\n",
" model=model,# passing our model\n",
" sagemaker_session=sagemaker_session)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Deploy the Multi Model Endpoint\n",
"\n",
"You need to consider the appropriate instance type and number of instances for the projected prediction workload across all the models you plan to host behind your multi-model endpoint. The number and size of the individual models will also drive memory requirements."
]
},
{
"cell_type": "code",
"execution_count": 22,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"-----------------!"
]
}
],
"source": [
"predictor = mme.deploy(initial_instance_count=1,\n",
" instance_type=ENDPOINT_INSTANCE_TYPE,\n",
" endpoint_name=ENDPOINT_NAME)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Our endpoint has launched! Let's look at what models are available to the endpoint!\n",
"\n",
"By 'available', what we mean is, what model artfiacts are currently stored under the S3 prefix we defined when setting up the `MultiDataModel` above i.e. `model_data_prefix`.\n",
"\n",
"Currently, since we have no artifacts (i.e. `tar.gz` files) stored under our defined S3 prefix, our endpoint, will have no models 'available' to serve inference requests.\n",
"\n",
"We will demonstrate how to make models 'available' to our endpoint below."
]
},
{
"cell_type": "code",
"execution_count": 23,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"[]"
]
},
"execution_count": 23,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"# No models visible!\n",
"list(mme.list_models())"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Lets deploy model artifacts to be found by the endpoint\n",
"\n",
"We are now using the `.add_model()` method of the `MultiDataModel` to copy over our model artifacts from where they were initially stored, during training, to where our endpoint will source model artifacts for inference requests.\n",
"\n",
"`model_data_source` refers to the location of our model artifact (i.e. where it was deposited on S3 after training completed)\n",
"\n",
"`model_data_path` is the **relative** path to the S3 prefix we specified above (i.e. `model_data_prefix`) where our endpoint will source models for inference requests.\n",
"\n",
"Since this is a **relative** path, we can simply pass the name of what we wish to call the model artifact at inference time (i.e. `Chicago_IL.tar.gz`)\n",
"\n",
"### Dynamically deploying additional models\n",
"\n",
"It is also important to note, that we can always use the `.add_model()` method, as shown below, to dynamically deploy more models to the endpoint, to serve up inference requests as needed."
]
},
{
"cell_type": "code",
"execution_count": 24,
"metadata": {},
"outputs": [],
"source": [
"for est in estimators:\n",
" artifact_path = est.latest_training_job.describe()['ModelArtifacts']['S3ModelArtifacts']\n",
" model_name = artifact_path.split('/')[-4]+'.tar.gz'\n",
" # This is copying over the model artifact to the S3 location for the MME.\n",
" mme.add_model(model_data_source=artifact_path, model_data_path=model_name)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## We have added the 4 model artifacts from our training jobs!\n",
"\n",
"We can see that the S3 prefix we specified when setting up `MultiDataModel` now has 4 model artifacts. As such, the endpoint can now serve up inference requests for these models."
]
},
{
"cell_type": "code",
"execution_count": 25,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"['Chicago_IL.tar.gz',\n",
" 'Houston_TX.tar.gz',\n",
" 'LosAngeles_CA.tar.gz',\n",
" 'NewYork_NY.tar.gz']"
]
},
"execution_count": 25,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"list(mme.list_models())"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Get predictions from the endpoint\n",
"\n",
"Recall that ```mme.deploy()``` returns a [RealTimePredictor](https://github.com/aws/sagemaker-python-sdk/blob/master/src/sagemaker/predictor.py#L35) that we saved in a variable called ```predictor```.\n",
"\n",
"We will use ```predictor``` to submit requests to the endpoint."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Invoking models on a multi-model endpoint\n",
"Notice the higher latencies on the first invocation of any given model. This is due to the time it takes SageMaker to download the model to the Endpoint instance and then load the model into the inference container. Subsequent invocations of the same model take advantage of the model already being loaded into the inference container."
]
},
{
"cell_type": "code",
"execution_count": 27,
"metadata": {},
"outputs": [
{
"ename": "ModelError",
"evalue": "An error occurred (ModelError) when calling the InvokeEndpoint operation: Received server error (500) from model with message \"{\n \"code\": 500,\n \"type\": \"InternalServerException\",\n \"message\": \"Worker died.\"\n}\n\". See https://us-east-1.console.aws.amazon.com/cloudwatch/home?region=us-east-1#logEventViewer:group=/aws/sagemaker/Endpoints/mme-sklearn-housing-V3 in account 511226949265 for more information.",
"output_type": "error",
"traceback": [
"\u001b[0;31m---------------------------------------------------------------------------\u001b[0m",
"\u001b[0;31mModelError\u001b[0m Traceback (most recent call last)",
"\u001b[0;32m<ipython-input-27-9543cdd9e28b>\u001b[0m in \u001b[0;36m<module>\u001b[0;34m\u001b[0m\n\u001b[1;32m 1\u001b[0m \u001b[0mstart_time\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mtime\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mtime\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 2\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m----> 3\u001b[0;31m \u001b[0mpredicted_value\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mpredictor\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mpredict\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mdata\u001b[0m\u001b[0;34m=\u001b[0m\u001b[0mgen_random_house\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m[\u001b[0m\u001b[0;36m1\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m]\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mtarget_model\u001b[0m\u001b[0;34m=\u001b[0m\u001b[0;34m'Chicago_IL.tar.gz'\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 4\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 5\u001b[0m \u001b[0mduration\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mtime\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mtime\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m)\u001b[0m \u001b[0;34m-\u001b[0m \u001b[0mstart_time\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n",
"\u001b[0;32m~/anaconda3/envs/python3/lib/python3.6/site-packages/sagemaker/predictor.py\u001b[0m in \u001b[0;36mpredict\u001b[0;34m(self, data, initial_args, target_model, target_variant)\u001b[0m\n\u001b[1;32m 93\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 94\u001b[0m \u001b[0mrequest_args\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_create_request_args\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mdata\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0minitial_args\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mtarget_model\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mtarget_variant\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m---> 95\u001b[0;31m \u001b[0mresponse\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0msagemaker_session\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0msagemaker_runtime_client\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0minvoke_endpoint\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m**\u001b[0m\u001b[0mrequest_args\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 96\u001b[0m \u001b[0;32mreturn\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_handle_response\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mresponse\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 97\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n",
"\u001b[0;32m~/anaconda3/envs/python3/lib/python3.6/site-packages/botocore/client.py\u001b[0m in \u001b[0;36m_api_call\u001b[0;34m(self, *args, **kwargs)\u001b[0m\n\u001b[1;32m 314\u001b[0m \"%s() only accepts keyword arguments.\" % py_operation_name)\n\u001b[1;32m 315\u001b[0m \u001b[0;31m# The \"self\" in this scope is referring to the BaseClient.\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m--> 316\u001b[0;31m \u001b[0;32mreturn\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_make_api_call\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0moperation_name\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mkwargs\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 317\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 318\u001b[0m \u001b[0m_api_call\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m__name__\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mstr\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mpy_operation_name\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n",
"\u001b[0;32m~/anaconda3/envs/python3/lib/python3.6/site-packages/botocore/client.py\u001b[0m in \u001b[0;36m_make_api_call\u001b[0;34m(self, operation_name, api_params)\u001b[0m\n\u001b[1;32m 633\u001b[0m \u001b[0merror_code\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mparsed_response\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mget\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m\"Error\"\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;34m{\u001b[0m\u001b[0;34m}\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mget\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m\"Code\"\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 634\u001b[0m \u001b[0merror_class\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mexceptions\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mfrom_code\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0merror_code\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m--> 635\u001b[0;31m \u001b[0;32mraise\u001b[0m \u001b[0merror_class\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mparsed_response\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0moperation_name\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 636\u001b[0m \u001b[0;32melse\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 637\u001b[0m \u001b[0;32mreturn\u001b[0m \u001b[0mparsed_response\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n",
"\u001b[0;31mModelError\u001b[0m: An error occurred (ModelError) when calling the InvokeEndpoint operation: Received server error (500) from model with message \"{\n \"code\": 500,\n \"type\": \"InternalServerException\",\n \"message\": \"Worker died.\"\n}\n\". See https://us-east-1.console.aws.amazon.com/cloudwatch/home?region=us-east-1#logEventViewer:group=/aws/sagemaker/Endpoints/mme-sklearn-housing-V3 in account 511226949265 for more information."
]
}
],
"source": [
"start_time = time.time()\n",
"\n",
"predicted_value = predictor.predict(data=gen_random_house()[1:], target_model='Chicago_IL.tar.gz')\n",
"\n",
"duration = time.time() - start_time\n",
"print('${:,.2f}, took {:,d} ms\\n'.format(predicted_value[0], int(duration * 1000)))"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"start_time = time.time()\n",
"\n",
"predicted_value = predictor.predict(data=gen_random_house()[1:], target_model='Chicago_IL.tar.gz')\n",
"\n",
"duration = time.time() - start_time\n",
"print('${:,.2f}, took {:,d} ms\\n'.format(predicted_value[0], int(duration * 1000)))"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"start_time = time.time()\n",
"\n",
"predicted_value = predictor.predict(data=gen_random_house()[1:], target_model='Houston_TX.tar.gz')\n",
"\n",
"duration = time.time() - start_time\n",
"print('${:,.2f}, took {:,d} ms\\n'.format(predicted_value[0], int(duration * 1000)))"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"start_time = time.time()\n",
"\n",
"predicted_value = predictor.predict(data=gen_random_house()[1:], target_model='Houston_TX.tar.gz')\n",
"\n",
"duration = time.time() - start_time\n",
"print('${:,.2f}, took {:,d} ms\\n'.format(predicted_value[0], int(duration * 1000)))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Updating a model\n",
"To update a model, you would follow the same approach as above and add it as a new model. For example, if you have retrained the `NewYork_NY.tar.gz` model and wanted to start invoking it, you would upload the updated model artifacts behind the S3 prefix with a new name such as `NewYork_NY_v2.tar.gz`, and then change the `target_model` field to invoke `NewYork_NY_v2.tar.gz` instead of `NewYork_NY.tar.gz`. You do not want to overwrite the model artifacts in Amazon S3, because the old version of the model might still be loaded in the containers or on the storage volume of the instances on the endpoint. Invocations to the new model could then invoke the old version of the model.\n",
"\n",
"Alternatively, you could stop the endpoint and re-deploy a fresh set of models."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Using Boto APIs to invoke the endpoint\n",
"\n",
"While developing interactively within a Jupyter notebook, since `.deploy()` returns a `RealTimePredictor` it is a more seamless experience to start invoking your endpoint using the SageMaker SDK. You have more fine grained control over the serialization and deserialization protocols to shape your request and response payloads to/from the endpoint. This is the approach we demonstrated above where the `RealTimePredictor` was stored in the variable `predictor`.\n",
"\n",
"This is great for iterative experimentation within a notebook. Furthermore, should you have an application that has access to the SageMaker SDK, you can always import `RealTimePredictor` and attach it to an existing endpoint - this allows you to stick to using the high level SDK if preferable.\n",
"\n",
"Additional documentation on `RealTimePredictor` can be found [here.](https://sagemaker.readthedocs.io/en/stable/api/inference/predictors.html?highlight=RealTimePredictor#sagemaker.predictor.RealTimePredictor)\n",
"\n",
"The lower level Boto3 SDK may be preferable if you are attempting to invoke the endpoint as a part of a broader architecture.\n",
"\n",
"Imagine an API gateway frontend that uses a Lambda Proxy in order to transform request payloads before hitting a SageMaker Endpoint - in this example, Lambda does not have access to the SageMaker Python SDK, and as such, Boto3 can still allow you to interact with your endpoint and serve inference requests.\n",
"\n",
"Boto3 allows for quick injection of ML intelligence via SageMaker Endpoints into existing applications with minimal/no refactoring to existing code.\n",
"\n",
"Boto3 will submit your requests as a binary payload, while still allowing you to supply your desired `Content-Type` and `Accept` headers with serialization being handled by the inference container in the SageMaker Endpoint.\n",
"\n",
"Additional documentation on `.invoke_endpoint()` can be found [here.](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sagemaker-runtime.html)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import boto3\n",
"import json\n",
"\n",
"runtime_sm_client = boto3.client(service_name='sagemaker-runtime')\n",
"\n",
"def predict_one_house_value(features, model_name):\n",
" print(f'Using model {model_name} to predict price of this house: {features}')\n",
"\n",
" float_features = [float(i) for i in features]\n",
" body = ','.join(map(str, float_features)) + '\\n'\n",
" \n",
" start_time = time.time()\n",
"\n",
" response = runtime_sm_client.invoke_endpoint(\n",
" EndpointName=ENDPOINT_NAME,\n",
" ContentType='text/csv',\n",
" TargetModel=model_name,\n",
" Body=body)\n",
" \n",
" predicted_value = json.loads(response['Body'].read())[0]\n",
"\n",
" duration = time.time() - start_time\n",
" \n",
" print('${:,.2f}, took {:,d} ms\\n'.format(predicted_value, int(duration * 1000)))"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"predict_one_house_value(gen_random_house()[1:], 'Chicago_IL.tar.gz')"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Clean up\n",
"Here, to be sure we are not billed for endpoints we are no longer using, we clean up."
]
},
{
"cell_type": "code",
"execution_count": 28,
"metadata": {},
"outputs": [],
"source": [
"predictor.delete_endpoint()"
]
},
{
"cell_type": "code",
"execution_count": 29,
"metadata": {},
"outputs": [],
"source": [
"predictor.delete_model()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "conda_python3",
"language": "python",
"name": "conda_python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.6.10"
}
},
"nbformat": 4,
"nbformat_minor": 2
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment