Skip to content

Instantly share code, notes, and snippets.

@randyphoa
Created November 11, 2022 07:07
Show Gist options
  • Save randyphoa/71eb41b1f02282f13300aa79f7d9eb26 to your computer and use it in GitHub Desktop.
Save randyphoa/71eb41b1f02282f13300aa79f7d9eb26 to your computer and use it in GitHub Desktop.
Sample Python Function that handles request from R Shiny app
def func():
import os
import ibm_db
import subprocess
import sqlalchemy
import numpy as np
import pandas as pd
import ibm_watson_studio_lib
import ibm_watson_machine_learning
from sklearn.cluster import KMeans
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import MinMaxScaler
from sklearn.preprocessing import OneHotEncoder
from sklearn.compose import make_column_selector, make_column_transformer
SPACE_ID = "1d1a7011-662f-4d25-88f0-7bfa1b37a798"
API_KEY = "xxx"
wml_credentials = {"username": "admin", "apikey": API_KEY, "url": os.environ["RUNTIME_ENV_APSX_URL"], "instance_id": "openshift", "version": "4.5"}
wml_client = ibm_watson_machine_learning.APIClient(wml_credentials)
wml_client.set.default_space(SPACE_ID)
wslib = ibm_watson_studio_lib.access_project_or_space()
my_file = wslib.load_data("data.csv")
my_file.seek(0)
data = pd.read_csv(my_file, engine="python").fillna("")
def get_accident_data(params):
country = "'" + "','".join(params["COUNTRY"]) + "'"
df = data.query(f"COUNTRY in ({country})")
return df
def get_accident_details(params):
# accident_id = ",".join(params["ACCIDENT_ID"])
accident_id = [int(x) for x in params["ACCIDENT_ID"]]
df = data.query(f"ACCIDENT_ID in ({accident_id})")
return df
def get_clusters(params):
country = "'" + "','".join(params["COUNTRY"]) + "'"
n_clusters = params["N_CLUSTERS"]
features = params["FEATURES"]
algorithm = params["ALGORITHM"]
df = data.query(f"COUNTRY in ({country})")
cols = []
if "location" in features:
cols = cols + ["LONGITUDE", "LATITUDE"]
if "fatalities" in features:
cols = cols + ["DRIVER_DEAD", "PASS_DEAD", "PED_DEAD"]
if "injuries" in features:
cols = cols + ["PASS_INJ", "PED_INJ"]
if "collision" in features:
cols = cols + ["COLLISIONTYPE"]
ct = make_column_transformer(
(OneHotEncoder(sparse=False), make_column_selector(dtype_include=object)),
remainder="passthrough"
)
pipeline = Pipeline(steps=[
("transform", ct),
("scale", MinMaxScaler()),
("model", KMeans(n_clusters=n_clusters, random_state=12345))
])
pipeline.fit(df[cols])
df["CLUSTER"] = pipeline["model"].labels_ + 1
return df[["ACCIDENT_ID", "COUNTRY", "LONGITUDE", "LATITUDE", "CLUSTER"]]
def get_predictions(params):
wslib = ibm_watson_studio_lib.access_project_or_space()
asset_name = params["ASSET_NAME"]
deployment_id = params["DEPLOYMENT_ID"]
my_file = wslib.load_data(asset_name)
my_file.seek(0)
df = pd.read_csv(my_file).fillna("")
accident_id = df.pop("ACCIDENT_ID")
latitude = df["LATITUDE"]
longitude = df["LONGITUDE"]
payload = {"input_data": [{"values": df}]}
results = wml_client.deployments.score(deployment_id, payload)["predictions"][0]
prediction, probability = zip(*results["values"])
labels = ["PREDICTION", "Road Maintenance", "Road Planning", "Street Lights", "Speed Limitation", "Traffic Lights"]
df = pd.concat([pd.DataFrame(prediction), pd.DataFrame(probability).round(2)], axis=1)
df.columns = labels
df["ACCIDENT_ID"] = accident_id
df["LATITUDE"] = latitude
df["LONGITUDE"] = longitude
return df
def get_models(params):
model_name = params["MODEL_NAME"]
models = [{"ID": x["metadata"]["id"], "NAME":x["metadata"]["name"]} for x in wml_client.deployments.get_details()["resources"] if model_name in x["metadata"]["name"]]
return pd.DataFrame(models)
def get_predict_input_files(params):
predict_input = params["PREDICT_INPUT"]
predict_input_files = [{"ID": x["metadata"]["asset_id"], "NAME":x["metadata"]["name"]} for x in wml_client.data_assets.get_details()["resources"] if predict_input in x["metadata"]["name"]]
return pd.DataFrame(predict_input_files)
ACTIONS_MAPPING = {
"GET_ACCIDENT_DATA": get_accident_data,
"GET_ACCIDENT_DETAILS": get_accident_details,
"GET_CLUSTERS": get_clusters,
"GET_CLUSTERS_DETAILS": get_cluster_details,
"SAVE_CLUSTERS_RESULTS": save_clusters_results,
"GET_PREDICTIONS": get_predictions,
"GET_MODELS": get_models,
"GET_PREDICT_INPUT_FILES": get_predict_input_files,
}
def score(payload):
req = payload["input_data"][0]["values"][0]
action = req["ACTION"]
params = req["PARAMS"]
if action in ACTIONS_MAPPING:
df = ACTIONS_MAPPING[action](params)
return {"predictions": [{"fields": df.columns.tolist(), "values": df.values.tolist()}]}
return {"predictions": [{"values": ["NO ACTION"]}]}
return score
meta_props = {
wml_client.repository.FunctionMetaNames.NAME: FUNCTION_NAME,
wml_client.repository.FunctionMetaNames.SOFTWARE_SPEC_UID: wml_client.software_specifications.get_uid_by_name("runtime-22.1-py3.9"),
}
function_details = wml_client.repository.store_function(meta_props=meta_props, function=func)
function_id = wml_client.repository.get_function_id(function_details)
meta_props = {
wml_client.deployments.ConfigurationMetaNames.NAME: FUNCTION_DEPLOYMENT_NAME,
wml_client.deployments.ConfigurationMetaNames.ONLINE: {},
wml_client.deployments.ConfigurationMetaNames.HARDWARE_SPEC: {"id": wml_client.hardware_specifications.get_id_by_name("M")},
}
deployments_details = wml_client.deployments.create(function_id, meta_props=meta_props)
deployment_id = wml_client.deployments.get_id(deployments_details)
deployment_id
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment