Last active
May 16, 2019 23:05
-
-
Save natashadsilva/c6a0118e19971c7ebfc4cd663406f0bc to your computer and use it in GitHub Desktop.
Scoring with PMML toolkit
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import streamsx.ec | |
import streamsx.standard.utility as util | |
import streamsx.spl.op as op | |
from enum import IntEnum | |
from time import sleep | |
import time | |
import csv | |
from streamsx.topology.schema import StreamSchema | |
import streamsx.pmml as pmml | |
class CSVFileReader: | |
def __init__(self, file_name): | |
self.file_name = file_name | |
def __call__(self): | |
# Convert each row in the file to a dict | |
drug_file = os.path.join(streamsx.ec.get_application_directory(), "etc", "DRUG_SAMPLE.csv") | |
col_names = ["age", "sex", "bloodPressure", "cholesterol", | |
"bloodSodiumConcentration", "bloodPotassionConcentration", "referenceDrug"] | |
# run this indefinitely so that there will always be data for the view | |
while True: | |
with open(drug_file) as handle: | |
reader = csv.DictReader(handle, delimiter=',', | |
fieldnames=col_names) | |
#Use this to skip the header line if your file has one | |
next(reader) | |
#yield the lines in the file one at a time | |
for row in reader: | |
yield dict(age= int(row["age"]), | |
sex = row["sex"], | |
bloodPressure= row["bloodPressure"], | |
cholesterol=row["cholesterol"] , | |
bloodSodiumConcentration=float(row["bloodSodiumConcentration"]), | |
bloodPotassionConcentration=float(row["bloodPotassionConcentration"]), | |
referenceDrug= row[ "referenceDrug"], NA_to_K=0, predictedDrug="") | |
# use this instead of throttle | |
time.sleep(0.50) | |
class NA_to_K(): | |
def __call__(self, frame): | |
frame["Na_to_K"] = frame["bloodSodiumConcentration"] / frame["bloodPotassionConcentration"] | |
return frame | |
topo = Topology("Scoring_Simple") | |
rawData = StreamSchema( | |
"tuple<int32 age, rstring sex, rstring bloodPressure, rstring cholesterol, float64 bloodSodiumConcentration, float64 bloodPotassionConcentration,rstring referenceDrug>" | |
) | |
# extend the schema for the score function | |
scoredData = rawData.extend(StreamSchema("tuple<float64 NA_to_K, rstring predictedDrug>")) | |
# add files to be contained in the archive which is deployed to the node running the application | |
# in this sample we need the `dataset` with sample data to be present at the worker node | |
topo.add_file_dependency(os.environ['DSX_PROJECT_DIR']+'/datasets/DRUG_SAMPLE.csv', 'etc') | |
# read the csv records from the file | |
records = topo.source(CSVFileReader("DRUG_SAMPLE.csv")) | |
#calculate Na to K and convert to structured schema | |
preprocess = records.map(NA_to_K()).map(lambda row: (row["age"], row["sex"], | |
row["bloodPressure"], | |
row["cholesterol"], | |
row["bloodSodiumConcentration"], | |
row["bloodPotassionConcentration"], | |
row[ "referenceDrug"], | |
row["Na_to_K"],row["predictedDrug"]), | |
schema=scoredData) | |
preprocess.print(name="SPL schema") | |
# score the records | |
score = pmml.score(preprocess, | |
schema=scoredData, | |
model_input_attribute_mapping='Age=age,BP=bloodPressure,Cholesterol=cholesterol,Na_to_K=NA_to_K,Sex=sex', | |
model_path='../models/Drug/1/model', | |
model_output_attribute_mapping='predictedDrug=Drug.PredictedValue' | |
) | |
score.isolate().print(name="Print Score") | |
#submit app | |
score_view = score.view(name="ScoredRecords_", description="Sample of scored records") | |
from streamsx.topology import context | |
# Disable SSL certificate verification if necessary | |
cfg[context.ConfigParams.SSL_VERIFY] = False | |
# build and submit | |
submission_result = context.submit('DISTRIBUTED', | |
topo, | |
cfg) | |
print(submission_result) | |
# The submission_result object contains information about the running application, or job | |
if submission_result.job: | |
print("JobId: ", submission_result.job.id , "Name: ", submission_result.job.name) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment