An example SeldonDeployment using mlflow to store the model
"Modified from [here]("
"import os\n",
"import warnings\n",
"import sys\n",
"import pandas as pd\n",
"import numpy as np\n",
"from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score\n",
"from sklearn.model_selection import train_test_split\n",
"from sklearn.linear_model import ElasticNet\n",
"from urllib.parse import urlparse\n",
"import mlflow\n",
"import mlflow.sklearn\n",
"import logging\n",
"logger = logging.getLogger(__name__)\n",
"def eval_metrics(actual, pred):\n",
" rmse = np.sqrt(mean_squared_error(actual, pred))\n",
" mae = mean_absolute_error(actual, pred)\n",
" r2 = r2_score(actual, pred)\n",
" return rmse, mae, r2\n"
"from IPython.core.magic import register_line_cell_magic\n",
"def writetemplate(line, cell):\n",
" with open(line, \"w\") as f:\n",
" f.write(cell.format(**globals()))"
"# Create a model"
"# Read the wine-quality csv file from the URL\n",
"csv_url = (\n",
" \"\"\n",
" data = pd.read_csv(csv_url, sep=\";\")\n",
"except Exception as e:\n",
" logger.exception(\n",
" \"Unable to download training & test CSV, check your internet connection. Error: %s\", e\n",
" )\n",
"# Split the data into training and test sets. (0.75, 0.25) split.\n",
"train, test = train_test_split(data)\n",
"# The predicted column is \"quality\" which is a scalar from [3, 9]\n",
"train_x = train.drop([\"quality\"], axis=1)\n",
"test_x = test.drop([\"quality\"], axis=1)\n",
"train_y = train[[\"quality\"]]\n",
"test_y = test[[\"quality\"]]\n",
"alpha = 0.5\n",
"l1_ratio = 0.5"
"Elasticnet model (alpha=0.500000, l1_ratio=0.500000):\n",
" RMSE: 0.793164022927685\n",
" MAE: 0.6271946374319586\n",
" R2: 0.10862644997792636\n"
"lr = ElasticNet(alpha=alpha, l1_ratio=l1_ratio, random_state=42)\n",
", train_y)\n",
"predicted_qualities = lr.predict(test_x)\n",
"(rmse, mae, r2) = eval_metrics(test_y, predicted_qualities)\n",
"print(\"Elasticnet model (alpha=%f, l1_ratio=%f):\" % (alpha, l1_ratio))\n",
"print(\" RMSE: %s\" % rmse)\n",
"print(\" MAE: %s\" % mae)\n",
"print(\" R2: %s\" % r2)\n"
"# Do an example prediction"
"[[ 0 1 2 3 4 5 6 7 8 9 10]]\n"
"dummy_data = np.array(range(11))[np.newaxis,:]\n",
"# Store parameters and model in mlflow"
"mlflow_ip = !kubectl -n kubeflow get svc mlflow-server -o jsonpath='{.spec.clusterIP}'\n",
"mlflow_tracking_uri = f\"http://{mlflow_ip[0]}:5000\"\n",
"minio_ip = !kubectl -n kubeflow get svc minio -o jsonpath='{.spec.clusterIP}'\n",
"mlflow_s3_endpoint_url = f\"http://{minio_ip[0]}:9000\"\n",
"aws_access_key_id = \"minio\"\n",
"# Could get this from `kubectl get secret mlflow-server-seldon-init-container-s3-credentials -o jsonpath='{.data.RCLONE_CONFIG_S3_SECRET_ACCESS_KEY}' | base64 -d`\n",
"aws_secret_access_key = \"X9KGX9W9C7GVTKPV4T1MMTP4V9ZT60\"\n",
"# From mlflow-server service\n",
"os.environ['MLFLOW_TRACKING_URI'] = mlflow_tracking_uri\n",
"# from minio service\n",
"os.environ[\"MLFLOW_S3_ENDPOINT_URL\"] = mlflow_s3_endpoint_url\n",
"os.environ[\"AWS_ACCESS_KEY_ID\"] = aws_access_key_id\n",
"os.environ[\"AWS_SECRET_ACCESS_KEY\"] = aws_secret_access_key"
"# Note: doing this directly without specifying a run will implicitly create a run (in the default experiment?)\n",
"# and set that run as the mlflow.active_run()\n",
"# Could also do this inside a `with mlflow.start_run(run_name='test_run'):` block\n",
"mlflow.log_param(\"alpha\", alpha)\n",
"mlflow.log_param(\"l1_ratio\", l1_ratio)\n",
"mlflow.log_metric(\"rmse\", rmse)\n",
"mlflow.log_metric(\"r2\", r2)\n",
"mlflow.log_metric(\"mae\", mae)\n"
"Bucket mlflow already exists - using existing bucket\n"
"# Patch to create bucket in minio if it doesn't exist already\n",
"import boto3\n",
"import botocore\n",
"import os\n",
"def create_default_bucket(default_bucket):\n",
" \"\"\"Checks whether default_bucket exists and is accessible, creating it if it does not\"\"\"\n",
" c = boto3.client(\n",
" 's3',\n",
" endpoint_url=os.environ[\"MLFLOW_S3_ENDPOINT_URL\"],\n",
" aws_access_key_id=os.environ[\"AWS_ACCESS_KEY_ID\"],\n",
" aws_secret_access_key=os.environ[\"AWS_SECRET_ACCESS_KEY\"],\n",
" )\n",
" try:\n",
" # If this succeeded, bucket exists already and we have access to it\n",
" c.head_bucket(Bucket=default_bucket)\n",
" print(f\"Bucket {default_bucket} already exists - using existing bucket\")\n",
" except botocore.exceptions.ClientError:\n",
" # Otherwise bucket is missing or do not have access. Try to create it\n",
" try:\n",
" print(f\"Creating bucket {default_bucket}\")\n",
" c.create_bucket(Bucket=default_bucket)\n",
" except botocore.exceptions.ClientError as e:\n",
" # Cannot create bucket. Block charm\n",
" message = \"Could not access/create the default artifact storage bucket.\" \\\n",
" f\" Got error: {str(e)}\"\n",
" raise ValueError(message)\n",
"Registered model 'ElasticnetWineModel' already exists. Creating a new version of this model...\n",
"2022/03/04 15:58:21 INFO mlflow.tracking._model_registry.client: Waiting up to 300 seconds for model version to finish creation. Model name: ElasticnetWineModel, version 2\n",
"Created version '2' of model 'ElasticnetWineModel'.\n"
"ModelInfo(artifact_path='model', flavors={'python_function': {'model_path': 'model.pkl', 'loader_module': 'mlflow.sklearn', 'python_version': '3.9.7', 'env': 'conda.yaml'}, 'sklearn': {'pickled_model': 'model.pkl', 'sklearn_version': '1.0.2', 'serialization_format': 'cloudpickle'}}, model_uri='runs:/7ea073ed2bc643e7bb5f593e53269d4d/model', model_uuid='5b89ca5b8b554f7c8b0fdd07e3c80d90', run_id='7ea073ed2bc643e7bb5f593e53269d4d', saved_input_example_info=None, signature_dict=None, utc_time_created='2022-03-04 20:58:19.053647')"
"mlflow.sklearn.log_model(lr, \"model\", registered_model_name=\"ElasticnetWineModel\", )"
"# If needed, create the secrets (if you're using an old mlflow charm, this wont be created for you)"
"namespace_for_seldondeployment = \"admin\""
"from base64 import b64encode\n",
"def _b64_encode_dict(d):\n",
" \"\"\"Returns the dict with values being base64 encoded.\"\"\"\n",
" # Why do we encode and decode in utf-8 first?\n",
" return {k: b64encode(v.encode(\"utf-8\")).decode(\"utf-8\") for k, v in d.items()}\n",
"def _seldon_credentials_dict(obj_storage):\n",
" \"\"\"Returns a dict of seldon init-container object storage credentials, base64 encoded.\"\"\"\n",
" credentials = {\n",
" \"RCLONE_CONFIG_S3_TYPE\": \"s3\",\n",
" \"RCLONE_CONFIG_S3_PROVIDER\": \"minio\",\n",
" \"RCLONE_CONFIG_S3_ACCESS_KEY_ID\": obj_storage[\"access-key\"],\n",
" \"RCLONE_CONFIG_S3_SECRET_ACCESS_KEY\": obj_storage[\"secret-key\"],\n",
" \"RCLONE_CONFIG_S3_ENDPOINT\": f\"http://{obj_storage['service']}.{obj_storage['namespace']}:{obj_storage['port']}\",\n",
" \"RCLONE_CONFIG_S3_ENV_AUTH\": \"false\",\n",
" }\n",
" return _b64_encode_dict(credentials)\n",
"obj_storage = {\n",
" \"access-key\": \"minio\",\n",
" \"secret-key\": \"X9KGX9W9C7GVTKPV4T1MMTP4V9ZT60\",\n",
" \"service\": \"minio\",\n",
" \"namespace\": \"kubeflow\",\n",
" \"port\": \"9000\",\n",
"secret_data = _seldon_credentials_dict(obj_storage)"
"{'RCLONE_CONFIG_S3_TYPE': 'czM=',\n",
" 'RCLONE_CONFIG_S3_PROVIDER': 'bWluaW8=',\n",
" 'RCLONE_CONFIG_S3_ENDPOINT': 'aHR0cDovL21pbmlvLmt1YmVmbG93OjkwMDA=',\n",
"serialized_secret_data = \"\\n\".join((f\" {k}: {v}\" for k, v in secret_data.items()))\n",
"%%writetemplate seldon_secret.yaml\n",
"apiVersion: v1\n",
"kind: Secret\n",
" name: mlflow-server-seldon-init-container-s3-credentials\n",
"secret/mlflow-server-seldon-init-container-s3-credentials configured\n"
"!kubectl apply -f seldon_secret.yaml -n $namespace_for_seldondeployment"
"# Deploy the model using Seldon"
"run = mlflow.active_run()\n",
"artifact_uri =\n",
"%%writetemplate sd_example.yaml\n",
"kind: SeldonDeployment\n",
" name: mlflow\n",
" name: wines\n",
" predictors:\n",
" - componentSpecs:\n",
" - spec:\n",
" # We are setting high failureThreshold as installing conda dependencies\n",
" # can take long time and we want to avoid k8s killing the container prematurely\n",
" containers:\n",
" - name: classifier\n",
" image: seldonio/mlflowserver:1.14.0-dev # Patches a package conflict in the default version\n",
" livenessProbe:\n",
" initialDelaySeconds: 80\n",
" failureThreshold: 200\n",
" periodSeconds: 5\n",
" successThreshold: 1\n",
" httpGet:\n",
" path: /health/ping\n",
" port: http\n",
" scheme: HTTP\n",
" readinessProbe:\n",
" initialDelaySeconds: 80\n",
" failureThreshold: 200\n",
" periodSeconds: 5\n",
" successThreshold: 1\n",
" httpGet:\n",
" path: /health/ping\n",
" port: http\n",
" scheme: HTTP\n",
" graph:\n",
" children: []\n",
" implementation: MLFLOW_SERVER\n",
" # mlflow/experimentid/runid/artifacts/model\n",
" modelUri: {artifact_uri}/model\n",
" envSecretRefName: mlflow-server-seldon-init-container-s3-credentials\n",
" name: classifier\n",
" name: default\n",
" replicas: 1\n"
" created\n"
"!kubectl create -f sd_example.yaml"
"Waiting for deployment \"mlflow-default-0-classifier\" rollout to finish: 0 of 1 updated replicas are available...\n",
"deployment \"mlflow-default-0-classifier\" successfully rolled out\n"
"deployment_name = !kubectl get deploy -l seldon-deployment-id=mlflow -o jsonpath=$jsonpath\n",
"deployment_name = deployment_name[0]\n",
"!kubectl rollout status deploy/$deployment_name\n"
"# Predict with the deployment"
"### Create some dummy data"
"data_escaped='\\\\{\\\\\"data\\\\\":\\\\ \\\\{\\\\\"ndarray\\\\\":\\\\ \\\\[\\\\[0,\\\\ 1,\\\\ 2,\\\\ 3,\\\\ 4,\\\\ 5,\\\\ 6,\\\\ 7,\\\\ 8,\\\\ 9,\\\\ 10\\\\]\\\\]\\\\}\\\\}'\n"
"import json\n",
"# data_json = json.dumps(dummy_data.tolist())\n",
"data_json = json.dumps({\"data\": {\"ndarray\": dummy_data.tolist()}})\n",
"# There must be a better way, but I couldn't figure out a better way to get data into the curl statement with proper escaping\n",
"import re\n",
"data_escaped = re.escape(data_json)\n",
"data_escaped = data_escaped.replace('\"', '\\\\\"')\n",
"# # SeldonDeployment Service name\n",
"# !kubectl get svc -l seldon-deployment-id=mlflow, -o jsonpath='{.items[0]}'\n",
"# SeldonDeployment Service IP\n",
"seldondeployment_predict_ip = !kubectl get svc -l seldon-deployment-id=mlflow, -o jsonpath='{.items[0].spec.clusterIP}'\n",
"seldondeployment_predict_endpoint = f\"http://{seldondeployment_predict_ip[0]}:9000/predict\"\n",
"!curl $seldondeployment_predict_endpoint -X POST -H 'Content-Type: application/json' -d $data_escaped"
"Where we see the ndarray returned is the same as our original prediction above"
