Skip to content

Instantly share code, notes, and snippets.

@natashadsilva
Last active May 16, 2019 23:05
Show Gist options
  • Save natashadsilva/c6a0118e19971c7ebfc4cd663406f0bc to your computer and use it in GitHub Desktop.
Save natashadsilva/c6a0118e19971c7ebfc4cd663406f0bc to your computer and use it in GitHub Desktop.
Scoring with PMML toolkit
import os
import streamsx.ec
import streamsx.standard.utility as util
import streamsx.spl.op as op
from enum import IntEnum
from time import sleep
import time
import csv
from streamsx.topology.schema import StreamSchema
import streamsx.pmml as pmml
class CSVFileReader:
def __init__(self, file_name):
self.file_name = file_name
def __call__(self):
# Convert each row in the file to a dict
drug_file = os.path.join(streamsx.ec.get_application_directory(), "etc", "DRUG_SAMPLE.csv")
col_names = ["age", "sex", "bloodPressure", "cholesterol",
"bloodSodiumConcentration", "bloodPotassionConcentration", "referenceDrug"]
# run this indefinitely so that there will always be data for the view
while True:
with open(drug_file) as handle:
reader = csv.DictReader(handle, delimiter=',',
fieldnames=col_names)
#Use this to skip the header line if your file has one
next(reader)
#yield the lines in the file one at a time
for row in reader:
yield dict(age= int(row["age"]),
sex = row["sex"],
bloodPressure= row["bloodPressure"],
cholesterol=row["cholesterol"] ,
bloodSodiumConcentration=float(row["bloodSodiumConcentration"]),
bloodPotassionConcentration=float(row["bloodPotassionConcentration"]),
referenceDrug= row[ "referenceDrug"], NA_to_K=0, predictedDrug="")
# use this instead of throttle
time.sleep(0.50)
class NA_to_K():
def __call__(self, frame):
frame["Na_to_K"] = frame["bloodSodiumConcentration"] / frame["bloodPotassionConcentration"]
return frame
topo = Topology("Scoring_Simple")
rawData = StreamSchema(
"tuple<int32 age, rstring sex, rstring bloodPressure, rstring cholesterol, float64 bloodSodiumConcentration, float64 bloodPotassionConcentration,rstring referenceDrug>"
)
# extend the schema for the score function
scoredData = rawData.extend(StreamSchema("tuple<float64 NA_to_K, rstring predictedDrug>"))
# add files to be contained in the archive which is deployed to the node running the application
# in this sample we need the `dataset` with sample data to be present at the worker node
topo.add_file_dependency(os.environ['DSX_PROJECT_DIR']+'/datasets/DRUG_SAMPLE.csv', 'etc')
# read the csv records from the file
records = topo.source(CSVFileReader("DRUG_SAMPLE.csv"))
#calculate Na to K and convert to structured schema
preprocess = records.map(NA_to_K()).map(lambda row: (row["age"], row["sex"],
row["bloodPressure"],
row["cholesterol"],
row["bloodSodiumConcentration"],
row["bloodPotassionConcentration"],
row[ "referenceDrug"],
row["Na_to_K"],row["predictedDrug"]),
schema=scoredData)
preprocess.print(name="SPL schema")
# score the records
score = pmml.score(preprocess,
schema=scoredData,
model_input_attribute_mapping='Age=age,BP=bloodPressure,Cholesterol=cholesterol,Na_to_K=NA_to_K,Sex=sex',
model_path='../models/Drug/1/model',
model_output_attribute_mapping='predictedDrug=Drug.PredictedValue'
)
score.isolate().print(name="Print Score")
#submit app
score_view = score.view(name="ScoredRecords_", description="Sample of scored records")
from streamsx.topology import context
# Disable SSL certificate verification if necessary
cfg[context.ConfigParams.SSL_VERIFY] = False
# build and submit
submission_result = context.submit('DISTRIBUTED',
topo,
cfg)
print(submission_result)
# The submission_result object contains information about the running application, or job
if submission_result.job:
print("JobId: ", submission_result.job.id , "Name: ", submission_result.job.name)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment