Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Scikit-learn and related code samples
# Which attributes/features to choose?
# Which model to use?
# Tune/optimize the chosen model for the best performance
# Ensuring the trained model will generalize to unseen data
# Estimate performance of the trained model on unseen data
# imports
import sklearn
import IPython.display
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd
import numpy as np
from sklearn import metrics
from sklearn.linear_model import LogisticRegression
from sklearn.neighbors import KNeighborsClassifier
from sklearn.model_selection import train_test_split, cross_val_score
# load toy dataset / sklearn class/functions are imported on demand
from sklearn.datasets import load_iris
iris = load_iris()
X = iris.data
y = iris.target
print(X.shape)
print(y.shape)
# KNN classification illustrating sklearn's 4-step modelling pattern
# Step-1: import the class
from sklearn.neighbors import KNeighborsClassifier
# Step-2: Instantiate the estimator (model in sklearn) passing desired parameter values
knn = KNeighborsClassifier(n_neighbors=1)
# Step-3: Fit the model, i.e. perform model training
knn.fit(X,y) # in-place operation
# Step-4: Predict for new data samples
knn.predict([[3,7,11,2]])
X_test = [[3,5,4,2], [5,4,3,2]]
knn.predict(X_test)
# MODEL TUNING : try different model parameters for best performance
n_neighbors = 5
knn = KNeighborsClassifier(n_neighbors=1)
knn.fit(X,y)
knn.predict(X_test)
# Try different estimator
# import
from sklearn.linear_model import LogisticRegression
# instantiate
est = LogisticRegression()
# train
est.fit(X,y)
# predict
y_pred = est.predict(X)
# choose model -> tune model -> estimate performacne on out-of-sample data
# evaluate classification accuracies
# accuracy
acc = metrics.accuracy_score(y, y_pred)
print(acc)
# TRAIN-TEST split
# split into train-test sets ; train on the train set ; evaluate on the test set
from sklearn.model_selection import train_test_split
X_train, X_test, y_train, y_test = train_test_split(X,y,test_size=0.4)
model = LogisticRegression()
model.fit(X_train,y_train)
y_test_pred = model.predict(X_test)
acc_test = metrics.accuracy_score(y_test, y_test_pred)
print(acc_test)
# search for k in knnclassifier
k_range = range(1, 26)
acc_scores = []
for k in k_range:
est = KNeighborsClassifier(n_neighbors=k)
est.fit(X_train, y_train)
y_test_pred = est.predict(X_test)
acc_scores.append(round(metrics.accuracy_score(y_test, y_test_pred), 3))
# plot testing accuracy vs model complexity
plt.plot(k_range, acc_scores)
plt.xlabel("K in KNN")
plt.ylabel("Test accuracy")
# after choosing and tuning the model based on train-test-val splits, retrain the final one on whole dataset
# train-test split evaluation gives high variance estimate of out-of-sample accuracy, as testing accuracy can vary
# a lot based on which samples are there in the test set.
# use k-folds cross-validation for better estimate
#========================================
df = pd.DataFrame(np.concatenate([X,y[:,np.newaxis]],axis=1), columns = iris.feature_names+['type'])
# visualize relationship b/w features and response variable
# sns.pairplot(df,x_vars=iris.feature_names, y_vars='type')
# Use train-test split for feature selection well.
## Use k-fold cross-validation for selecting model, tuning parameters, selecting features
# kflod provides better estimate of out-of-sample accuracy and use every sample for both training and testing
from sklearn.model_selection import KFold
kf = KFold(n_splits=10) # 10 splits is recommended
kf.split(X)
# in classification problems, use Startified sampling
# repeat multiple cross-validations with different random splits of the data, and average across test accuracy/score.
# use hold-out set from cv; then report final accuracy of tuned model on this.
# feature engg and selection within cv.
from sklearn.model_selection import cross_val_score
knn = KNeighborsClassifier(n_neighbors=5)
scores = cross_val_score(knn, X, y, cv=10, scoring='accuracy')
print(scores.mean())
# search/tune k with cv accuracy
k_scores = []
for k in range(1,31):
knn = KNeighborsClassifier(n_neighbors=k)
scores = cross_val_score(knn,X,y,cv=10,scoring='accuracy')
k_scores.append(round(scores.mean(), 3))
## Grid search of hyper-parameters
from sklearn.model_selection import GridSearchCV, RandomizedSearchCV
# define parameter ranges to be searched
k_range = range(1,31)
# create parameter grid as dict
param_grid = dict(n_neighbors=k_range)
# instantiate the grid
grid = GridSearchCV(knn, param_grid, cv=10, scoring='accuracy')
# fit the grid object
grid.fit(X,y)
grid.cv_results_['mean_test_score']
# examine the best model
grid.best_score_
grid.best_params_
grid.best_estimator_
## search/tune multiple parameters simultaneously
k_range = range(1,31)
weight_options = ['uniform', 'distance']
param_grid = dict(n_neighbors=k_range, weights = weight_options)
grid = GridSearchCV(knn, param_grid, cv=10, scoring='accuracy')
grid.fit(X,y)
# examine the best model
grid.best_score_
grid.best_params_
grid.best_estimator_
# predict using grid object
grid.predict([[3,6,8,11]])
## RandomizedSearchCV —> seaches a subset of parameters, which we can control
k_range = range(1,31)
weight_options = ['uniform', 'distance']
# specify param distribution
param_dist = dict(n_neighbors=k_range, weights = weight_options)
# instantiate for 20 iteration of search
rand = RandomizedSearchCV(knn, param_distributions=param_dist, scoring='accuracy', n_iter=20, random_state=50)
rand.fit(X,y)
# examine the best model
rand.best_score_
rand.best_params_
rand.best_estimator_
## start with grid-search and see how much time it takes. If longer, use RandomSearch selecting appropriate n_iter value
## Model evaluation procedure and evaluation metric to generalize how well overall pipeline perform on the out-of-sample data -> Model selection ; paramter tuning/optimal selection ; choosing among features / feature-selection
import pandas as pd
data_url = 'https://raw.githubusercontent.com/justmarkham/scikit-learn-videos/master/data/pima-indians-diabetes.data'
col_names = ['pregnant', 'glucose', 'bp', 'skin', 'insulin', 'bmi', 'pedigree', 'age', 'label']
pima = pd.read_csv(data_url, header=None, names=col_names)
## define X and y
# select some of the features
feature_names = ['pregnant', 'insulin', 'bmi', 'age']
X = pima[feature_names]
y = pima.label
# split data
from sklearn.model_selection import train_test_split
X_train, X_test, y_train, y_test = train_test_split(X,y,test_size=0.25, random_state=10)
# train a logistic regression model on the training set
from sklearn.linear_model import LogisticRegression
logreg = LogisticRegression()
logreg.fit(X_train, y_train)
# predict for test-set
y_test_pred = logreg.predict(X_test)
# classification accuracy
from sklearn import metrics
acc = metrics.accuracy_score(y_test, y_test_pred)
print(acc)
# check class distribution of the test-set
y_test.value_counts()
# percentage samples in each class
per0, per1 = y_test.value_counts(normalize=True)
# null accuracy - predict label of dominant class (a baseline)
acc_null = round(y_test.value_counts(normalize=True).max(), 3)
print(acc_null)
## Classification accuracy doesn't represent underlying label/prediction distribution
# confusion matrix -> can be used to generate multiple performance metrics; useful with multi-class problem as well
cmatrix = metrics.confusion_matrix(y_test, y_test_pred)
TN, FP = cmatrix[0] ; FN, TP = cmatrix[1]
## compute metrics from confusion matrix
# accuracy and error-rate
acc_conf = (TP+TN)/(TP + TN + FP + FN)
err_conf = 1-acc_conf
# sensitivity/recall/TPR
sens = TP/(TP+FN)
metrics.recall_score(y_test, y_test_pred)
# specificity/selectivity/TNR
spec = TN/(TN+FP)
# FPR
fpr = FP/(TN+FP)
# precision
precision = TP/(TP+FP)
metrics.precision_score(y_test, y_test_pred)
# F1 score
f1 = metrics.f1_score(y_test, y_test_pred)
## Adjusting classification threshold
y_test_pred = logreg.predict(X_test)
y_test_prob = logreg.predict_proba(X_test)[:,1]
# analyze distribution/histogram
plt.hist(y_test_prob, bins=8)
plt.xlim(0, 1)
plt.title('Histogram of predicted probabilities')
plt.xlabel('Predicted probability of diabetes')
plt.ylabel('Frequency')
# decrease the threshold (default is 0.5) for predicting +ve class, thus increasing sensitivity
from sklearn.preprocessing import binarize
y_pred_thresh = binarize([y_test_prob], threshold=0.3)[0]
cmatrix_thresh = metrics.confusion_matrix(y_test, y_pred_thresh)
sens = metrics.recall_score(y_test, y_pred_thresh) # TPR: increased
spec = cmatrix_thresh[0,0]/(cmatrix_thresh[0,0] + cmatrix_thresh[0,1]) # TNR: decreased
# Sensitivity and specificity have an inverse relationship
# generate and plot sensitivity-vs-specificity ROC for different thresholds;
fpr, tpr, thresholds = metrics.roc_curve(y_test, y_test_prob)
plt.plot(fpr, tpr)
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.0])
plt.title('ROC curve for diabetes classifier')
plt.xlabel('False Positive Rate (1 - Specificity)')
plt.ylabel('True Positive Rate (Sensitivity)')
plt.grid(True)
# define a function that accepts a threshold and prints sensitivity and specificity
def evaluate_threshold(threshold):
print('Sensitivity:', tpr[thresholds > threshold][-1])
print('Specificity:', 1 - fpr[thresholds > threshold][-1])
# summarize ROC by calculating AUC
# ROC_AUC represents likelihood of classifier higher +ve prob to a +ve observation
# useful with high class imbalance as well unlike classification accuracy
# we dont need to mention/choose any particular threshold for model evaluation
# but less interpretable for multicalss problems, here confusion matrix is a better choice
roc_auc = metrics.roc_auc_score(y_test, y_test_prob)
print(roc_auc)
# Use ROC-AUC with cross_val_score
cross_val_score(logreg, X, y, cv=10, scoring='roc_auc').max()
==============================
## XGBoost
# import dataset
import pandas as pd
import xgboost as xgb
import numpy as np
from sklearn.datasets import load_boston
from sklearn.model_selection import train_test_split
from sklearn import metrics
boston = load_boston(return_X_y=False)
data = pd.DataFrame(boston.data)
data.columns = boston.feature_names
data['PRICE'] = boston.target
X, y = data.iloc[:,:-1],data.iloc[:,-1]
data_dmatrix = xgb.DMatrix(data=X,label=y)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=123)
xg_reg = xgb.XGBRegressor(objective ='reg:linear', colsample_bytree = 0.3, learning_rate = 0.1,
max_depth = 5, alpha = 10, n_estimators = 10)
xg_reg.fit(X_train,y_train)
preds = xg_reg.predict(X_test)
rmse = np.sqrt(metrics.mean_squared_error(y_test, preds))
print("RMSE: %f" % (rmse))
params = {"objective":"reg:linear",'colsample_bytree': 0.3,'learning_rate': 0.1,
'max_depth': 5, 'alpha': 10}
cv_results = xgb.cv(dtrain=data_dmatrix, params=params, nfold=3,
num_boost_round=50,early_stopping_rounds=10,metrics="rmse", as_pandas=True, seed=123)
cv_results.head()
print((cv_results["test-rmse-mean"]).tail(1))
===========================
(1)
{'bootstrap': [True, False],
'max_depth': [10, 20, 30, 40, 50, 60, 70, 80, 90, 100, None],
'max_features': ['auto', 'sqrt'],
'min_samples_leaf': [1, 2, 4],
'min_samples_split': [2, 5, 10],
'n_estimators': [10, 20, 40, 60, 100, 150, 200, 400, 600]}
# Create the random grid
random_grid = {'n_estimators': n_estimators,
'max_features': max_features,
'max_depth': max_depth,
'min_samples_split': min_samples_split,
'min_samples_leaf': min_samples_leaf,
'bootstrap': bootstrap}
rf_random = RandomizedSearchCV(estimator = rf, param_distributions = random_grid, n_iter = 100, cv = 3, verbose=2, random_state=42, n_jobs = -1)
best_random = rf_random.best_estimator_
(2)
param_grid = {
'bootstrap': [True],
'max_depth': [80, 90, 100, 110],
'max_features': [2, 3],
'min_samples_leaf': [3, 4, 5],
'min_samples_split': [8, 10, 12],
'n_estimators': [10, 20, 50, 100, 200]
}
# Create a based model
rf = RandomForestRegressor()# Instantiate the grid search model
grid_search = GridSearchCV(estimator = rf, param_grid = param_grid, cv = 3, n_jobs = -1, verbose = 2)
(3)
=====================
from imblearn.over_sampling import RandomOverSampler, SMOTE, ADASYN
RANDOM_STATE=42
# just uncomment the oversampling strategy you want to experiment with
ros = RandomOverSampler(random_state=RANDOM_STATE)
#ros = SMOTE(random_state=RANDOM_STATE)
#ros = ADASYN(random_state=RANDOM_STATE)
X_resampled, y_resampled = ros.fit_resample(X_train, y_train)
# this is just to check if now the 2 classes are equally distributed
print(sorted(Counter(y_resampled).items()))
rf = RandomForestClassifier(n_jobs=-1, random_state=RANDOM_STATE,
n_estimators=100, min_samples_leaf=11)
rf.fit(X_resampled, y_resampled)
print_report(rf, X_valid, y_valid, t=0.4, X_train=X_train, y_train=y_train)
========================
params = {"objective":"reg:linear",'colsample_bytree': 0.3,'learning_rate': 0.1,
'max_depth': 5, 'alpha': 10}
cv_results = xgb.cv(dtrain=data_dmatrix, params=params, nfold=3,
num_boost_round=50,early_stopping_rounds=10,metrics="rmse", as_pandas=True, seed=123)
=========================
{"learning_rate" : [0.05, 0.10, 0.15, 0.20, 0.30 ] ,
"max_depth" : [ 3, 5, 7, 10, 12, 15],
"min_child_weight" : [ 1, 3, 5, 7 ],
"gamma" : [ 0.0, 0.1, 0.2 , 0.3, 0.4 ],
"colsample_bytree" : [ 0.3, 0.4, 0.5 , 0.7 ] }
# A parameter grid for XGBoost
params = {
'min_child_weight': [1, 5, 10],
'gamma': [0.5, 1, 1.5, 2, 5],
'subsample': [0.6, 0.8, 1.0],
'colsample_bytree': [0.6, 0.8, 1.0],
'max_depth': [3, 4, 5]
}
xgb = XGBClassifier(learning_rate=0.02, n_estimators=600, objective='binary:logistic',
silent=True, nthread=1)
folds = 5
param_comb = 50
skf = StratifiedKFold(n_splits=folds, shuffle = True, random_state = 1001)
random_search = RandomizedSearchCV(xgb, param_distributions=params, n_iter=param_comb, scoring='roc_auc', n_jobs=-1, cv=skf.split(X,Y), verbose=3, random_state=1001 )
random_search.fit(X, Y)
#===============
#SVM
tuned_parameters = [{'kernel': ['rbf'], 'C': [0.1, 1, 10, 100, 1000], 'gamma': [1, 0.1, 0.01, 0.001, 0.0001]} ,
{'kernel': ['linear'], 'C': [1, 10, 100, 1000]}]
scores = ['precision', 'recall']
for score in scores:
print("# Tuning hyper-parameters for %s" % score)
print()
clf = GridSearchCV(
SVC(), tuned_parameters, scoring='%s_macro' % score
)
clf.fit(X_train, y_train)
print("Best parameters set found on development set:")
print()
print(clf.best_params_)
print()
print("Grid scores on development set:")
print()
means = clf.cv_results_['mean_test_score']
stds = clf.cv_results_['std_test_score']
for mean, std, params in zip(means, stds, clf.cv_results_['params']):
print("%0.3f (+/-%0.03f) for %r"
% (mean, std * 2, params))
print()
print("Detailed classification report:")
print()
print("The model is trained on the full development set.")
print("The scores are computed on the full evaluation set.")
print()
y_true, y_pred = y_test, clf.predict(X_test)
print(classification_report(y_true, y_pred))
print()
#===============
# GaussianProcessClassifier
from sklearn.gaussian_process import GaussianProcessClassifier
from sklearn.gaussian_process.kernels import Matern, DotProduct, RBF, RationalQuadratic, WhiteKernel
model = GaussianProcessClassifier(kernel=DotProduct(1.0))
param_grid = {'kernel':[DotProduct(i) for i in [0.2, 0.5, 1,2,3,5]] + [Matern(i) for i in [0.2, 0.5, 1,2,3,5]] + [RBF(i) for i in [0.2, 0.5, 1,2,3,5]] + [RationalQuadratic(i) for i in [0.2, 0.5, 1,2,3,5]] + [WhiteKernel(i) for i in [0.2, 0.5, 1,2,3,5]]}
clf = RandomizedSearchCV(model, param_grid,n_jobs=-1,n_iter=100,random_state=0,verbose=3)
#=============== # NN Classifier
# TIPS:
# 1. Multi-layer Perceptron is sensitive to feature scaling, so it is highly recommended to scale your data.
# 2. use StandardScaler in a Pipeline
# 3. Finding a reasonable regularization parameter is best done using GridSearchCV, usually in the range 10.0 ** -np.arange(1, 7).
# 4. For relatively large datasets (with thousands of training samples or more), however, Adam is very robust.
# 5. Currently fir our usecase, default settings are good.
from sklearn.neural_network import MLPClassifier
# create model object
clf = MLPClassifier(random_state=1, max_iter=300)
# train
clf.fit(X_train, y_train)
# predict
clf.predict_proba(X_test[:1])
clf.predict(X_test[:5, :])
clf.score(X_test, y_test)
## Using Keras
from tensorflow import keras
from scipy.stats import reciprocal
from keras.constraints import maxnorm
# set seed for reproducibility
seed = 7
numpy.random.seed(seed)
def build_model(n_hidden=1, n_neurons=30, learning_rate=3e-3, input_shape=[28, 28], init_mode='uniform', activation='relu', dropout_rate=0.5, weight_constraint=0):
model = keras.models.Sequential()
model.add(keras.layers.Flatten(input_shape=input_shape))
for layer in range(n_hidden):
model.add(keras.layers.Dense(n_neurons, activation=activation, kernel_initializer=init_mode,
kernel_constraint=keras.constraints.MaxNorm(weight_constraint)))
model.add(keras.layers.Dropout(dropout_rate))
model.add(keras.layers.Dense(10, activation="softmax", kernel_initializer=init_mode))
optimizer = keras.optimizers.Adam(lr=learning_rate)
model.compile(loss="categorical_crossentropy", optimizer=optimizer, metrics=["accuracy"])
return model
# define the grid search parameters
batch_size = [10, 20, 40, 60, 80, 100]
epochs = [10, 50, 100]
init_mode = ['uniform', 'lecun_uniform', 'normal', 'zero', 'glorot_normal', 'glorot_uniform', 'he_normal', 'he_uniform']
activation = ['relu', 'tanh', 'sigmoid', 'linear']
weight_constraint = [0, 1, 2, 3]
dropout_rate = [0.0, 0.1, 0.2, 0.3, 0.4, 0.5]
keras_param_space = {"n_hidden": [1, 2, 3, 4],
"n_neurons": np.arange(30, 300),
"learning_rate": reciprocal(3e-4, 3e-2)
}
param_grid = dict(batch_size=batch_size, epochs=epochs, init_mode=init_mode, dropout_rate=dropout_rate, weight_constraint=weight_constraint)
keras_param_space.update(param_grid)
# build model
keras_clf = tf.keras.wrappers.scikit_learn.KerasClassifier(build_fn=build_model, *<args-to-build_model>, epochs=100, batch_size=20, verbose=0)
keras_rand_search = RandomizedSearchCV(keras_clf, keras_param_space, n_iter=20,
cv=5, scoring="accuracy", n_jobs=-1, verbose=True)
keras_rand_search.fit(X, y)
# keras_rand_search.fit(X_train, y_train, epochs=100,
# validation_data=(X_valid, y_valid),
# callbacks=[keras.callbacks.EarlyStopping(patience=10)])
# summarize results
print("Best: %f using %s" % (grid_result.best_score_, grid_result.best_params_))
means = grid_result.cv_results_['mean_test_score']
stds = grid_result.cv_results_['std_test_score']
params = grid_result.cv_results_['params']
for mean, stdev, param in zip(means, stds, params):
    print("%f (%f) with: %r" % (mean, stdev, param))
#=============== # Callbacks
# example
def on_step(optim_result):
"""
Callback meant to view scores after
each iteration while performing Bayesian
Optimization in Skopt"""
score = forest_bayes_search.best_score_
print("best score: %s" % score) # Prints score after each iteration
if score >= 0.98: # early-stopping kind of functionality
print('Interrupting!')
return True
hpo_search.fit(X_train, y_train, callback=on_step) # callback=on_step will print score after each iteration
#===============
# Deploy
# ---------------------------- Create end-point
from sagemaker.sklearn.estimator import SKLearn
role = sagemaker.get_execution_role()
# Create the SKLearn Object by directing it to the aws_sklearn_main.py script
aws_sklearn = SKLearn(entry_point='aws_sklearn_main.py',
train_instance_type='ml.m4.xlarge',
role=role)
# Deploy model
# The deploy method creates the deployable model, configures the SageMaker hosting services endpoint, and launches the endpoint to host the model
aws_sklearn_predictor = aws_sklearn.deploy(instance_type='ml.m4.xlarge',
initial_instance_count=1,
content_type='text/csv',)
# a prediction request to the endpoint
response = aws_sklearn_predictor.predict(data)
model = aws_sklearn_predictor.create_model()
# Print the endpoint to test in next step
print(aws_sklearn_predictor.endpoint)
# Tears down the endpoint container and deletes the corresponding endpoint configuration
# aws_sklearn_predictor.delete_endpoint()
# Deletes the model
# aws_sklearn_predictor.delete_model()
# ---------------------------- Create end-point from existing model
# If you have an existing model and want to deploy it locally, don’t specify a sagemaker_session argument to the estimator constructor
# The correct session is generated when you call model.deploy()
import numpy
from sagemaker.mxnet import MXNetModel
model_location = 's3://mybucket/my_model.tar.gz'
code_location = 's3://mybucket/sourcedir.tar.gz'
s3_model = MXNetModel(model_data=model_location, role='SageMakerRole',
entry_point='mnist.py', source_dir=code_location)
predictor = s3_model.deploy(initial_instance_count=1, instance_type='local')
data = numpy.zeros(shape=(1, 1, 28, 28))
predictor.predict(data)
# ---------------------------- Call end-point
# create sagemaker client using boto3
# client = boto3.client('sagemaker-runtime')
client = boto3.client('runtime.sagemaker')
# Specify endpoint and content_type
custom_attributes = "c000b4f9-df62-4c85-a0bf-7c525f9104a4" # Optional
endpoint_name = "endpoint_from_deployed_model_in_step_6"
content_type = "text/csv"
accept = "..." # The desired MIME type of the inference in the response.
request_body = "..."
# Make call to endpoint
response = client.invoke_endpoint(
EndpointName=endpoint_name,
ContentType=content_type,
Body=request_body
)
print(response['CustomAttributes']) # Optional
# result = response['Body'].read().decode()
# print('Predicted label is {}.'.format(result))
## Useful excepts
# A customer's model containers must respond to requests within 60 seconds. The model itself can have a maximum processing time of 60 seconds before responding to invocations. If your model is going to take 50-60 seconds of processing time, the SDK socket timeout should be set to be 70 seconds.
# we create the model objects with the image and model data. These model objects are used to deploy production variants on an endpoint. The models are developed by training ML models on different data sets, different algorithms or ML frameworks, and different hyperparameters
# ---------------------------- Call end-point2
from sagemaker.amazon.amazon_estimator import get_image_uri
model_name = f"DEMO-xgb-churn-pred-{datetime.now():%Y-%m-%d-%H-%M-%S}"
model_name2 = f"DEMO-xgb-churn-pred2-{datetime.now():%Y-%m-%d-%H-%M-%S}"
image_uri = get_image_uri(boto3.Session().region_name, 'xgboost', '0.90-1')
image_uri2 = get_image_uri(boto3.Session().region_name, 'xgboost', '0.90-2')
sm_session.create_model(name=model_name, role=role, container_defs={
'Image': image_uri,
'ModelDataUrl': model_url
})
sm_session.create_model(name=model_name2, role=role, container_defs={
'Image': image_uri2,
'ModelDataUrl': model_url2
})
from sagemaker.session import production_variant
variant1 = production_variant(model_name=model_name,
instance_type="ml.m5.xlarge",
initial_instance_count=1,
variant_name='Variant1',
initial_weight=1)
variant2 = production_variant(model_name=model_name2,
instance_type="ml.m5.xlarge",
initial_instance_count=1,
variant_name='Variant2',
initial_weight=1)
endpoint_name = f"DEMO-xgb-churn-pred-{datetime.now():%Y-%m-%d-%H-%M-%S}"
print(f"EndpointName={endpoint_name}")
sm_session.endpoint_from_production_variants(
name=endpoint_name,
production_variants=[variant1, variant2]
)
# invoke end-point
# get a subset of test data for a quick test
!tail -120 test_data/test-dataset-input-cols.csv > test_data/test_sample_tail_input_cols.csv
print(f"Sending test traffic to the endpoint {endpoint_name}. \nPlease wait...")
with open('test_data/test_sample_tail_input_cols.csv', 'r') as f:
for row in f:
print(".", end="", flush=True)
payload = row.rstrip('\n')
sm_runtime.invoke_endpoint(EndpointName=endpoint_name,
ContentType="text/csv",
Body=payload,
TargetVariant="Variant1",# optional
)
time.sleep(0.5)
print("Done!")
# ---------------------------- Update endpoints production variant
sm.update_endpoint_weights_and_capacities(
EndpointName=endpoint_name,
DesiredWeightsAndCapacities=[
{
"DesiredWeight": 25,
"VariantName": variant1["VariantName"]
},
{
"DesiredWeight": 75,
"VariantName": variant2["VariantName"]
}
]
)
# ---------------------------- Inference data formats
# Amazon SageMaker algorithms accept and produce several different MIME types for the HTTP payloads used in retrieving online and mini-batch predictions.
# At a minimum, you need to convert the data for the following: Inference request serialization (handled by you), Inference request deserialization (handled by the algorithm), Inference response serialization (handled by the algorithm), Inference response deserialization (handled by you)
# Any transformations performed on the training data should also be performed on the data before obtaining inference. The order of the features matters and must remain unchanged.
#
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment