Skip to content

Instantly share code, notes, and snippets.

@reedv
Last active July 29, 2021 04:43
Show Gist options
  • Save reedv/169856f9442354a404fc0e1e0d3e8aa8 to your computer and use it in GitHub Desktop.
Save reedv/169856f9442354a404fc0e1e0d3e8aa8 to your computer and use it in GitHub Desktop.
Example script of training a random forest using H2O python module
import sys
# include config file location
import traceback
PROJECT_HOME = '/path/to/my/project_home'
sys.path.insert(0, PROJECT_HOME)
import time
import numpy as np
import re
from IPython.display import display
import pprint
import os
import subprocess
import h2o
# Standalone testing script for benchmarking algo using common dataset
# https://archive.ics.uci.edu/ml/datasets/Car+Evaluation
def quit_program(h2o_thread):
"""
If you don't close the h2o cluster explicitly, it stays open even if the script itself fails
:param exitstr:
:param h2o_thread:
:return:
"""
print()
if h2o_thread is not None:
if h2o_thread.isAlive:
h2o.cluster().shutdown()
print(('h2o python package version: %s' % h2o.__version__))
# TODO load in dataset and prep
# get DB input data
import pyodbc
import pandas as pd
pd.set_option('display.max_columns', None)
raw_data = pd.read_csv("/path/to/benchmark/data/car.data", header=0)
# inspect data
print('frame shape = {}'.format(raw_data.shape))
print(raw_data.dtypes)
display(raw_data.head(n=5))
display(raw_data.info(verbose=True))
display(raw_data.describe())
frame_in = raw_data
# inspect data
print(('frame shape = {}'.format(frame_in.shape)))
print((frame_in.dtypes))
frame_in.head(n=5)
# TODO h2o activation
# startup hadoop h2o cluster
import shlex
import re
from queue import Queue, Empty
from threading import Thread
def enqueue_output(out, queue):
"""
Function for communicating streaming text lines from seperate thread.
see https://stackoverflow.com/questions/375427/non-blocking-read-on-a-subprocess-pipe-in-python
"""
for line in iter(out.readline, b''):
queue.put(line)
out.close()
print((os.environ))
# clear legacy temp. dir.
# if os.path.isdir(CFG.h2o_legacy_dir):
# print subprocess.check_output(shlex.split(CFG.startup_cmds[0]))
try:
# start h2o service in background thread
HADOOP_H2O_JAR = os.path.join(PROJECT_HOME, 'resources', 'h2o-3.32.1.3-hdp3.1', 'h2odriver.jar')
HADOOP_STARTUP_CMD = '/bin/hadoop jar {} -nodes 2 -mapperXmx 4g -timeout 300'.format(HADOOP_H2O_JAR)
LOCAL_H2O_JAR = os.path.join(PROJECT_HOME, 'resources', 'h2o-3.32.1.3', 'h2o.jar')
LOCAL_STARTUP_CMD = 'java -Xmx8g -jar {}'.format(LOCAL_H2O_JAR)
startup_p = subprocess.Popen(shlex.split(LOCAL_STARTUP_CMD),
shell=False,
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
# setup message passing queue
q = Queue()
t = Thread(target=enqueue_output, args=(startup_p.stdout, q))
t.daemon = True # thread dies with the program
t.start()
# read line without blocking
h2o_url_out = ''
while True:
try:
line = q.get_nowait().decode('utf-8') # or q.get(timeout=.1)
except Empty:
continue
else: # got line
print(line)
# check for first instance connection url output
if re.search('Open H2O Flow in your web browser', line) is not None:
h2o_url_out = line
break
if re.search('error', line, re.IGNORECASE) is not None:
print(('Error generated: %s' % line))
print('Shutting down h2o cluster')
if t.isAlive and h2o.cluster(): h2o.cluster().shutdown()
print('exiting program')
sys.exit()
print(('Connection url output line: %s' % h2o_url_out))
h2o_cnxn_ip = re.search(r'(?<=Open H2O Flow in your web browser: http:\/\/)(.*?)(?=:)', h2o_url_out).group(1)
print(('H2O connection ip: %s' % h2o_cnxn_ip))
# connect to h2o service
h2o.init(ip=h2o_cnxn_ip)
h2o.cluster().show_status()
h2o.ls()
# TODO dx_outcome training test
from IPython.display import display
display(frame_in.value.value_counts())
display(frame_in.head(n=3))
# convert data to h2o workable structure
# see http://h2o-release.s3.amazonaws.com/h2o/master/3494/docs-website/h2o-py/docs/data.html#loading-data-from-a-python-object
print('Converting local python data set to H2O data frame')
print(('Checking that input data struct is convertible to H2OFrame: %s' % isinstance(frame_in, (tuple, list, dict, pd.DataFrame))))
assert isinstance(frame_in, (tuple, list, dict, pd.DataFrame))
COLUMN_TYPES = {
'buying': 'enum',
'maint': 'enum',
'doors': 'enum',
'persons': 'enum',
'lug_boot': 'enum',
'safety': 'enum',
'value': 'enum'
}
frame_in_h2o = h2o.H2OFrame(
# shuffling training dataset as well
frame_in.sample(frac=1, random_state=64).reset_index(drop=True),
column_types=COLUMN_TYPES)
print('H2O dataframe diagnostics:')
print(('types: %s' % frame_in_h2o.types))
print(('row counnt: %d' % frame_in_h2o.nrow))
print((len(frame_in_h2o)))
frame_in_h2o.head(rows=3)
version=0.01
train_response = 'value'
drf_dx = h2o.estimators.H2ORandomForestEstimator(
# denoting update version name by epoch timestamp
model_id=f'model_v{str(version)}T{str(int(time.time()))}',
response_column=train_response,
# weights_column='weight', # Balance classes and observation weights are not currently supported together
ntrees=64,
nbins=32,
balance_classes=True,
binomial_double_trees=True)
print((drf_dx.model_id))
print((drf_dx.__module__, drf_dx.__name__))
train_u, val_u = frame_in_h2o.split_frame(ratios=[0.80])
train_features = [label for label in COLUMN_TYPES if label != 'value']
train_response = 'value'
print(train_features)
print(train_response)
display(train_u.head(rows=3))
display(val_u.head(rows=3))
max_train_time_hrs = 3
drf_dx.train(x=train_features, y=train_response,
training_frame=train_u, validation_frame=val_u,
max_runtime_secs=max_train_time_hrs*60*60)
pprint.pprint(drf_dx.params['response_column'])
pprint.pprint(drf_dx.params)
print('\n\nParams...\n\n')
print((drf_dx.get_params))
print((drf_dx.confusion_matrix(metrics=['f0point5', 'f1', 'f2', 'accuracy'], train=False, valid=True)))
# fitting on the data to save results
# predictions = drf_dx.predict(frame_in_h2o)
predictions = drf_dx.predict(val_u)
print(predictions.describe())
# activation_thresh_target = 'accuracy' # most straight forward
activation_thresh_target = 'f0point5' # reduce FPs, more conservative (?)
# activation_thresh_target = 'f2' # reduce FNs, more aggressive (?)
activation_thresh = drf_dx.find_threshold_by_max_metric(activation_thresh_target, valid=True)
# in the case of this dataset, there are multiple classes as opposed to just binary, but this is just for demo purposes
neg_class, pos_class = frame_in_h2o['dx_outcome'].categories()[0], frame_in_h2o['dx_outcome'].categories()[-1]
print(f"Activation thresh: {activation_thresh}, neg class: {neg_class}, pos class: {pos_class}")
# TODO: fix, throws bytecode errors
# predictions['predict'] = predictions[pos_class].apply(
# lambda prob: pos_class if (prob >= activation_thresh) else neg_class)
# adjust for target threshold and convert class labels to numbers for final verification,
# since I think sklean.metrics can only work with number labels
predictions['predict'] = (predictions[pos_class] > activation_thresh).ifelse(pos_class, neg_class)
predictions['predicted_index'] = (predictions[pos_class] > activation_thresh).ifelse(1, 0)
predictions['true_label'] = val_u[train_response]
predictions['true_index'] = (predictions['true_label'] == pos_class).ifelse(1, 0)
display(predictions.head())
frame_w_preds = val_u.concat(predictions, axis=1)
print(frame_w_preds.describe())
predictions_df = frame_w_preds.as_data_frame()
except Exception as e:
print(e)
traceback.print_exc()
# TODO this does not actually fully kill the yarn application
# (need to do manually: yarn application -kill appication_<app ID>)
quit_program(t)
finally:
# TODO this does not actually fully kill the yarn application
# (need to do manually: yarn application -kill appication_<app ID>)
quit_program(t)
# double checking results on validation set
print('ROC AUC')
print(sklearn.metrics.roc_auc_score(y_true=predictions_df['true_index'],
y_score=predictions_df['predicted_index']))
print('PR AUC')
print(sklearn.metrics.average_precision_score(y_true=predictions_df['true_index'],
y_score=predictions_df['predicted_index'],
pos_label=1))
print('Accuracy')
print(sklearn.metrics.accuracy_score(y_true=predictions_df['true_index'],
y_pred=predictions_df['predicted_index']))
print('F1')
print(sklearn.metrics.f1_score(y_true=predictions_df['true_index'],
y_pred=predictions_df['predicted_index'],
pos_label=1))
print('Confusion Matrix')
print(sklearn.metrics.confusion_matrix(y_true=predictions_df['true_index'],
y_pred=predictions_df['predicted_index']))
# predictions_df.loc[predictions_df[pos_class] >= activation_thresh, 'predict'] = pos_class
# predictions_df.loc[predictions_df[pos_class] < activation_thresh, 'predict'] = neg_class
display(predictions_df.head())
now = datetime.datetime.now()
predictions_df.to_csv(
f"/path/to/my/project_home/data/eda.{activation_thresh_target.upper()}.{now.strftime('%Y%m%dT%H%M%S')}.TSV",
sep='\t',
header=True,
index=False
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment