Created
November 11, 2022 07:07
-
-
Save randyphoa/71eb41b1f02282f13300aa79f7d9eb26 to your computer and use it in GitHub Desktop.
Sample Python Function that handles request from R Shiny app
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def func(): | |
import os | |
import ibm_db | |
import subprocess | |
import sqlalchemy | |
import numpy as np | |
import pandas as pd | |
import ibm_watson_studio_lib | |
import ibm_watson_machine_learning | |
from sklearn.cluster import KMeans | |
from sklearn.pipeline import Pipeline | |
from sklearn.preprocessing import MinMaxScaler | |
from sklearn.preprocessing import OneHotEncoder | |
from sklearn.compose import make_column_selector, make_column_transformer | |
SPACE_ID = "1d1a7011-662f-4d25-88f0-7bfa1b37a798" | |
API_KEY = "xxx" | |
wml_credentials = {"username": "admin", "apikey": API_KEY, "url": os.environ["RUNTIME_ENV_APSX_URL"], "instance_id": "openshift", "version": "4.5"} | |
wml_client = ibm_watson_machine_learning.APIClient(wml_credentials) | |
wml_client.set.default_space(SPACE_ID) | |
wslib = ibm_watson_studio_lib.access_project_or_space() | |
my_file = wslib.load_data("data.csv") | |
my_file.seek(0) | |
data = pd.read_csv(my_file, engine="python").fillna("") | |
def get_accident_data(params): | |
country = "'" + "','".join(params["COUNTRY"]) + "'" | |
df = data.query(f"COUNTRY in ({country})") | |
return df | |
def get_accident_details(params): | |
# accident_id = ",".join(params["ACCIDENT_ID"]) | |
accident_id = [int(x) for x in params["ACCIDENT_ID"]] | |
df = data.query(f"ACCIDENT_ID in ({accident_id})") | |
return df | |
def get_clusters(params): | |
country = "'" + "','".join(params["COUNTRY"]) + "'" | |
n_clusters = params["N_CLUSTERS"] | |
features = params["FEATURES"] | |
algorithm = params["ALGORITHM"] | |
df = data.query(f"COUNTRY in ({country})") | |
cols = [] | |
if "location" in features: | |
cols = cols + ["LONGITUDE", "LATITUDE"] | |
if "fatalities" in features: | |
cols = cols + ["DRIVER_DEAD", "PASS_DEAD", "PED_DEAD"] | |
if "injuries" in features: | |
cols = cols + ["PASS_INJ", "PED_INJ"] | |
if "collision" in features: | |
cols = cols + ["COLLISIONTYPE"] | |
ct = make_column_transformer( | |
(OneHotEncoder(sparse=False), make_column_selector(dtype_include=object)), | |
remainder="passthrough" | |
) | |
pipeline = Pipeline(steps=[ | |
("transform", ct), | |
("scale", MinMaxScaler()), | |
("model", KMeans(n_clusters=n_clusters, random_state=12345)) | |
]) | |
pipeline.fit(df[cols]) | |
df["CLUSTER"] = pipeline["model"].labels_ + 1 | |
return df[["ACCIDENT_ID", "COUNTRY", "LONGITUDE", "LATITUDE", "CLUSTER"]] | |
def get_predictions(params): | |
wslib = ibm_watson_studio_lib.access_project_or_space() | |
asset_name = params["ASSET_NAME"] | |
deployment_id = params["DEPLOYMENT_ID"] | |
my_file = wslib.load_data(asset_name) | |
my_file.seek(0) | |
df = pd.read_csv(my_file).fillna("") | |
accident_id = df.pop("ACCIDENT_ID") | |
latitude = df["LATITUDE"] | |
longitude = df["LONGITUDE"] | |
payload = {"input_data": [{"values": df}]} | |
results = wml_client.deployments.score(deployment_id, payload)["predictions"][0] | |
prediction, probability = zip(*results["values"]) | |
labels = ["PREDICTION", "Road Maintenance", "Road Planning", "Street Lights", "Speed Limitation", "Traffic Lights"] | |
df = pd.concat([pd.DataFrame(prediction), pd.DataFrame(probability).round(2)], axis=1) | |
df.columns = labels | |
df["ACCIDENT_ID"] = accident_id | |
df["LATITUDE"] = latitude | |
df["LONGITUDE"] = longitude | |
return df | |
def get_models(params): | |
model_name = params["MODEL_NAME"] | |
models = [{"ID": x["metadata"]["id"], "NAME":x["metadata"]["name"]} for x in wml_client.deployments.get_details()["resources"] if model_name in x["metadata"]["name"]] | |
return pd.DataFrame(models) | |
def get_predict_input_files(params): | |
predict_input = params["PREDICT_INPUT"] | |
predict_input_files = [{"ID": x["metadata"]["asset_id"], "NAME":x["metadata"]["name"]} for x in wml_client.data_assets.get_details()["resources"] if predict_input in x["metadata"]["name"]] | |
return pd.DataFrame(predict_input_files) | |
ACTIONS_MAPPING = { | |
"GET_ACCIDENT_DATA": get_accident_data, | |
"GET_ACCIDENT_DETAILS": get_accident_details, | |
"GET_CLUSTERS": get_clusters, | |
"GET_CLUSTERS_DETAILS": get_cluster_details, | |
"SAVE_CLUSTERS_RESULTS": save_clusters_results, | |
"GET_PREDICTIONS": get_predictions, | |
"GET_MODELS": get_models, | |
"GET_PREDICT_INPUT_FILES": get_predict_input_files, | |
} | |
def score(payload): | |
req = payload["input_data"][0]["values"][0] | |
action = req["ACTION"] | |
params = req["PARAMS"] | |
if action in ACTIONS_MAPPING: | |
df = ACTIONS_MAPPING[action](params) | |
return {"predictions": [{"fields": df.columns.tolist(), "values": df.values.tolist()}]} | |
return {"predictions": [{"values": ["NO ACTION"]}]} | |
return score | |
meta_props = { | |
wml_client.repository.FunctionMetaNames.NAME: FUNCTION_NAME, | |
wml_client.repository.FunctionMetaNames.SOFTWARE_SPEC_UID: wml_client.software_specifications.get_uid_by_name("runtime-22.1-py3.9"), | |
} | |
function_details = wml_client.repository.store_function(meta_props=meta_props, function=func) | |
function_id = wml_client.repository.get_function_id(function_details) | |
meta_props = { | |
wml_client.deployments.ConfigurationMetaNames.NAME: FUNCTION_DEPLOYMENT_NAME, | |
wml_client.deployments.ConfigurationMetaNames.ONLINE: {}, | |
wml_client.deployments.ConfigurationMetaNames.HARDWARE_SPEC: {"id": wml_client.hardware_specifications.get_id_by_name("M")}, | |
} | |
deployments_details = wml_client.deployments.create(function_id, meta_props=meta_props) | |
deployment_id = wml_client.deployments.get_id(deployments_details) | |
deployment_id |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment