Created
March 10, 2020 13:36
-
-
Save Andrew9255/d70c03e2a5531841daa1691aced3abbd to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# coding: utf-8 | |
# In[2]: | |
import os | |
import boto3 | |
import re | |
from sagemaker import get_execution_role | |
role = get_execution_role() | |
bucket='Your_S3_Bucket' | |
data_key = 'Your_dataset.csv' | |
data_location = 's3://{}/{}'.format(bucket, data_key) | |
# In[3]: | |
import pandas as pd | |
import numpy as np | |
import matplotlib.pyplot as plt | |
import io | |
import time | |
import json | |
import sagemaker.amazon.common as smac | |
# In[4]: | |
data = pd.read_csv(data_location) | |
data.to_csv("data.csv", sep=',', index=False) | |
# print the shape of the data file | |
print(data.shape) | |
# show the top few rows | |
display(data.head()) | |
# describe the data object | |
display(data.describe()) | |
# In[10]: | |
rand_split = np.random.rand(len(data)) | |
train_list = rand_split < 0.8 | |
val_list = (rand_split >= 0.8) & (rand_split < 0.9) | |
test_list = rand_split >= 0.9 | |
data_train = data[train_list] | |
data_val = data[val_list] | |
data_test = data[test_list] | |
train_y = ((data_train.iloc[:,6])).as_matrix(); | |
train_X = (data_train.iloc[:,1:6]).as_matrix(); | |
# train_y = ((data_train.iloc[:,1] == 'M') +0).as_matrix(); | |
# train_X = data_train.iloc[:,2:].as_matrix(); | |
val_y = ((data_val.iloc[:,6]).as_matrix()) | |
val_X = data_val.iloc[:,1:6].as_matrix() | |
test_y = ((data_test.iloc[:,6]).as_matrix()) | |
test_X = data_test.iloc[:,1:6].as_matrix() | |
# In[12]: | |
train_file = 'linear_train.data' | |
f = io.BytesIO() | |
smac.write_numpy_to_dense_tensor(f, train_X.astype('float32'), train_y.astype('float32')) | |
f.seek(0) | |
boto3.Session().resource('s3').Bucket(bucket).Object(os.path.join(prefix, 'train', train_file)).upload_fileobj(f) | |
# In[13]: | |
validation_file = 'linear_validation.data' | |
f = io.BytesIO() | |
smac.write_numpy_to_dense_tensor(f, val_X.astype('float32'), val_y.astype('float32')) | |
f.seek(0) | |
boto3.Session().resource('s3').Bucket(bucket).Object(os.path.join(prefix, 'validation', validation_file)).upload_fileobj(f) | |
# --- | |
# ## Train | |
# | |
# Now we can begin to specify our linear model. Amazon SageMaker's Linear Learner actually fits many models in parallel, each with slightly different hyperparameters, and then returns the one with the best fit. This functionality is automatically enabled. We can influence this using parameters like: | |
# | |
# - `num_models` to increase to total number of models run. The specified parameters will always be one of those models, but the algorithm also chooses models with nearby parameter values in order to find a solution nearby that may be more optimal. In this case, we're going to use the max of 32. | |
# - `loss` which controls how we penalize mistakes in our model estimates. For this case, let's use absolute loss as we haven't spent much time cleaning the data, and absolute loss will be less sensitive to outliers. | |
# - `wd` or `l1` which control regularization. Regularization can prevent model overfitting by preventing our estimates from becoming too finely tuned to the training data, which can actually hurt generalizability. In this case, we'll leave these parameters as their default "auto" though. | |
# ### Specify container images used for training and hosting SageMaker's linear-learner | |
# In[14]: | |
# See 'Algorithms Provided by Amazon SageMaker: Common Parameters' in the SageMaker documentation for an explanation of these values. | |
from sagemaker.amazon.amazon_estimator import get_image_uri | |
container = get_image_uri(boto3.Session().region_name, 'linear-learner') | |
# In[17]: | |
linear_job = 'DEMO-linear-' + time.strftime("%Y-%m-%d-%H-%M-%S", time.gmtime()) | |
print("Job name is:", linear_job) | |
linear_training_params = { | |
"RoleArn": role, | |
"TrainingJobName": linear_job, | |
"AlgorithmSpecification": { | |
"TrainingImage": container, | |
"TrainingInputMode": "File" | |
}, | |
"ResourceConfig": { | |
"InstanceCount": 1, | |
"InstanceType": "ml.c4.2xlarge", | |
"VolumeSizeInGB": 10 | |
}, | |
"InputDataConfig": [ | |
{ | |
"ChannelName": "train", | |
"DataSource": { | |
"S3DataSource": { | |
"S3DataType": "S3Prefix", | |
"S3Uri": "s3://{}/{}/train/".format(bucket, prefix), | |
"S3DataDistributionType": "ShardedByS3Key" | |
} | |
}, | |
"CompressionType": "None", | |
"RecordWrapperType": "None" | |
}, | |
{ | |
"ChannelName": "validation", | |
"DataSource": { | |
"S3DataSource": { | |
"S3DataType": "S3Prefix", | |
"S3Uri": "s3://{}/{}/validation/".format(bucket, prefix), | |
"S3DataDistributionType": "FullyReplicated" | |
} | |
}, | |
"CompressionType": "None", | |
"RecordWrapperType": "None" | |
} | |
], | |
"OutputDataConfig": { | |
"S3OutputPath": "s3://{}/{}/".format(bucket, prefix) | |
}, | |
"HyperParameters": { | |
"feature_dim": "5", | |
"mini_batch_size": "100", | |
"predictor_type": "regressor", | |
"epochs": "10", | |
"num_models": "32", | |
"loss": "absolute_loss" | |
}, | |
"StoppingCondition": { | |
"MaxRuntimeInSeconds": 60 * 60 | |
} | |
} | |
# Now let's kick off our training job in SageMaker's distributed, managed training, using the parameters we just created. Because training is managed, we don't have to wait for our job to finish to continue, but for this case, let's use boto3's 'training_job_completed_or_stopped' waiter so we can ensure that the job has been started. | |
# In[18]: | |
get_ipython().run_cell_magic('time', '', "\nregion = boto3.Session().region_name\nsm = boto3.client('sagemaker')\n\nsm.create_training_job(**linear_training_params)\n\nstatus = sm.describe_training_job(TrainingJobName=linear_job)['TrainingJobStatus']\nprint(status)\nsm.get_waiter('training_job_completed_or_stopped').wait(TrainingJobName=linear_job)\nif status == 'Failed':\n message = sm.describe_training_job(TrainingJobName=linear_job)['FailureReason']\n print('Training failed with the following error: {}'.format(message))\n raise Exception('Training job failed')") | |
# --- | |
# ## Host | |
# | |
# Now that we've trained the linear algorithm on our data, let's setup a model which can later be hosted. We will: | |
# 1. Point to the scoring container | |
# 1. Point to the model.tar.gz that came from training | |
# 1. Create the hosting model | |
# In[22]: | |
linear_hosting_container = { | |
'Image': container, | |
'ModelDataUrl': sm.describe_training_job(TrainingJobName=linear_job)['ModelArtifacts']['S3ModelArtifacts'] | |
} | |
create_model_response = sm.create_model( | |
ModelName=linear_job, | |
ExecutionRoleArn=role, | |
PrimaryContainer=linear_hosting_container) | |
print(create_model_response['ModelArn']) | |
# Once we've setup a model, we can configure what our hosting endpoints should be. Here we specify: | |
# 1. EC2 instance type to use for hosting | |
# 1. Initial number of instances | |
# 1. Our hosting model name | |
# In[23]: | |
linear_endpoint_config = 'DEMO-linear-endpoint-config-' + time.strftime("%Y-%m-%d-%H-%M-%S", time.gmtime()) | |
print(linear_endpoint_config) | |
create_endpoint_config_response = sm.create_endpoint_config( | |
EndpointConfigName=linear_endpoint_config, | |
ProductionVariants=[{ | |
'InstanceType': 'ml.t2.medium', | |
'InitialInstanceCount': 1, | |
'ModelName': linear_job, | |
'VariantName': 'AllTraffic'}]) | |
print("Endpoint Config Arn: " + create_endpoint_config_response['EndpointConfigArn']) | |
# Now that we've specified how our endpoint should be configured, we can create them. This can be done in the background, but for now let's run a loop that updates us on the status of the endpoints so that we know when they are ready for use. | |
# In[24]: | |
get_ipython().run_cell_magic('time', '', '\nlinear_endpoint = \'DEMO-linear-endpoint-\' + time.strftime("%Y%m%d%H%M", time.gmtime())\nprint(linear_endpoint)\ncreate_endpoint_response = sm.create_endpoint(\n EndpointName=linear_endpoint,\n EndpointConfigName=linear_endpoint_config)\nprint(create_endpoint_response[\'EndpointArn\'])\n\nresp = sm.describe_endpoint(EndpointName=linear_endpoint)\nstatus = resp[\'EndpointStatus\']\nprint("Status: " + status)\n\nsm.get_waiter(\'endpoint_in_service\').wait(EndpointName=linear_endpoint)\n\nresp = sm.describe_endpoint(EndpointName=linear_endpoint)\nstatus = resp[\'EndpointStatus\']\nprint("Arn: " + resp[\'EndpointArn\'])\nprint("Status: " + status)\n\nif status != \'InService\':\n raise Exception(\'Endpoint creation did not succeed\')') | |
# ## Predict | |
# ### Predict on Test Data | |
# | |
# Now that we have our hosted endpoint, we can generate statistical predictions from it. Let's predict on our test dataset to understand how accurate our model is. | |
# | |
# There are many metrics to measure classification accuracy. Common examples include include: | |
# - Precision | |
# - Recall | |
# - F1 measure | |
# - Area under the ROC curve - AUC | |
# - Total Classification Accuracy | |
# - Mean Absolute Error | |
# | |
# For our example, we'll keep things simple and use total classification accuracy as our metric of choice. We will also evaluate Mean Absolute Error (MAE) as the linear-learner has been optimized using this metric, not necessarily because it is a relevant metric from an application point of view. We'll compare the performance of the linear-learner against a naive benchmark prediction which uses majority class observed in the training data set for prediction on the test data. | |
# | |
# | |
# | |
# ### Function to convert an array to a csv | |
# In[25]: | |
def np2csv(arr): | |
csv = io.BytesIO() | |
np.savetxt(csv, arr, delimiter=',', fmt='%g') | |
return csv.getvalue().decode().rstrip() | |
# Next, we'll invoke the endpoint to get predictions. | |
# In[26]: | |
runtime= boto3.client('runtime.sagemaker') | |
payload = np2csv(test_X) | |
response = runtime.invoke_endpoint(EndpointName=linear_endpoint, | |
ContentType='text/csv', | |
Body=payload) | |
result = json.loads(response['Body'].read().decode()) | |
test_pred = np.array([r['score'] for r in result['predictions']]) | |
# Let's compare linear learner based mean absolute prediction errors from a baseline prediction which uses majority class to predict every instance. | |
# In[27]: | |
test_mae_linear = np.mean(np.abs(test_y - test_pred)) | |
test_mae_baseline = np.mean(np.abs(test_y - np.median(train_y))) ## training median as baseline predictor | |
print("Test MAE Baseline :", round(test_mae_baseline, 3)) | |
print("Test MAE Linear:", round(test_mae_linear,3)) | |
# Let's compare predictive accuracy using a classification threshold of 0.5 for the predicted and compare against the majority class prediction from training data set | |
# In[28]: | |
test_pred_class = (test_pred > 0.5)+0; | |
test_pred_baseline = np.repeat(np.median(train_y), len(test_y)) | |
prediction_accuracy = np.mean((test_y == test_pred_class))*100 | |
baseline_accuracy = np.mean((test_y == test_pred_baseline))*100 | |
print("Prediction Accuracy:", round(prediction_accuracy,1), "%") | |
print("Baseline Accuracy:", round(baseline_accuracy,1), "%") | |
# ###### Run the cell below to delete endpoint once you are done. | |
# In[29]: | |
sm.delete_endpoint(EndpointName=linear_endpoint) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
@Andrew9255 Can you create a public repo from this code sample? I think the open-source community would benefit from your work.