-
-
Save reedv/169856f9442354a404fc0e1e0d3e8aa8 to your computer and use it in GitHub Desktop.
Example script of training a random forest using H2O python module
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import sys | |
# include config file location | |
import traceback | |
PROJECT_HOME = '/path/to/my/project_home' | |
sys.path.insert(0, PROJECT_HOME) | |
import time | |
import numpy as np | |
import re | |
from IPython.display import display | |
import pprint | |
import os | |
import subprocess | |
import h2o | |
# Standalone testing script for benchmarking algo using common dataset | |
# https://archive.ics.uci.edu/ml/datasets/Car+Evaluation | |
def quit_program(h2o_thread): | |
""" | |
If you don't close the h2o cluster explicitly, it stays open even if the script itself fails | |
:param exitstr: | |
:param h2o_thread: | |
:return: | |
""" | |
print() | |
if h2o_thread is not None: | |
if h2o_thread.isAlive: | |
h2o.cluster().shutdown() | |
print(('h2o python package version: %s' % h2o.__version__)) | |
# TODO load in dataset and prep | |
# get DB input data | |
import pyodbc | |
import pandas as pd | |
pd.set_option('display.max_columns', None) | |
raw_data = pd.read_csv("/path/to/benchmark/data/car.data", header=0) | |
# inspect data | |
print('frame shape = {}'.format(raw_data.shape)) | |
print(raw_data.dtypes) | |
display(raw_data.head(n=5)) | |
display(raw_data.info(verbose=True)) | |
display(raw_data.describe()) | |
frame_in = raw_data | |
# inspect data | |
print(('frame shape = {}'.format(frame_in.shape))) | |
print((frame_in.dtypes)) | |
frame_in.head(n=5) | |
# TODO h2o activation | |
# startup hadoop h2o cluster | |
import shlex | |
import re | |
from queue import Queue, Empty | |
from threading import Thread | |
def enqueue_output(out, queue): | |
""" | |
Function for communicating streaming text lines from seperate thread. | |
see https://stackoverflow.com/questions/375427/non-blocking-read-on-a-subprocess-pipe-in-python | |
""" | |
for line in iter(out.readline, b''): | |
queue.put(line) | |
out.close() | |
print((os.environ)) | |
# clear legacy temp. dir. | |
# if os.path.isdir(CFG.h2o_legacy_dir): | |
# print subprocess.check_output(shlex.split(CFG.startup_cmds[0])) | |
try: | |
# start h2o service in background thread | |
HADOOP_H2O_JAR = os.path.join(PROJECT_HOME, 'resources', 'h2o-3.32.1.3-hdp3.1', 'h2odriver.jar') | |
HADOOP_STARTUP_CMD = '/bin/hadoop jar {} -nodes 2 -mapperXmx 4g -timeout 300'.format(HADOOP_H2O_JAR) | |
LOCAL_H2O_JAR = os.path.join(PROJECT_HOME, 'resources', 'h2o-3.32.1.3', 'h2o.jar') | |
LOCAL_STARTUP_CMD = 'java -Xmx8g -jar {}'.format(LOCAL_H2O_JAR) | |
startup_p = subprocess.Popen(shlex.split(LOCAL_STARTUP_CMD), | |
shell=False, | |
stdout=subprocess.PIPE, stderr=subprocess.PIPE) | |
# setup message passing queue | |
q = Queue() | |
t = Thread(target=enqueue_output, args=(startup_p.stdout, q)) | |
t.daemon = True # thread dies with the program | |
t.start() | |
# read line without blocking | |
h2o_url_out = '' | |
while True: | |
try: | |
line = q.get_nowait().decode('utf-8') # or q.get(timeout=.1) | |
except Empty: | |
continue | |
else: # got line | |
print(line) | |
# check for first instance connection url output | |
if re.search('Open H2O Flow in your web browser', line) is not None: | |
h2o_url_out = line | |
break | |
if re.search('error', line, re.IGNORECASE) is not None: | |
print(('Error generated: %s' % line)) | |
print('Shutting down h2o cluster') | |
if t.isAlive and h2o.cluster(): h2o.cluster().shutdown() | |
print('exiting program') | |
sys.exit() | |
print(('Connection url output line: %s' % h2o_url_out)) | |
h2o_cnxn_ip = re.search(r'(?<=Open H2O Flow in your web browser: http:\/\/)(.*?)(?=:)', h2o_url_out).group(1) | |
print(('H2O connection ip: %s' % h2o_cnxn_ip)) | |
# connect to h2o service | |
h2o.init(ip=h2o_cnxn_ip) | |
h2o.cluster().show_status() | |
h2o.ls() | |
# TODO dx_outcome training test | |
from IPython.display import display | |
display(frame_in.value.value_counts()) | |
display(frame_in.head(n=3)) | |
# convert data to h2o workable structure | |
# see http://h2o-release.s3.amazonaws.com/h2o/master/3494/docs-website/h2o-py/docs/data.html#loading-data-from-a-python-object | |
print('Converting local python data set to H2O data frame') | |
print(('Checking that input data struct is convertible to H2OFrame: %s' % isinstance(frame_in, (tuple, list, dict, pd.DataFrame)))) | |
assert isinstance(frame_in, (tuple, list, dict, pd.DataFrame)) | |
COLUMN_TYPES = { | |
'buying': 'enum', | |
'maint': 'enum', | |
'doors': 'enum', | |
'persons': 'enum', | |
'lug_boot': 'enum', | |
'safety': 'enum', | |
'value': 'enum' | |
} | |
frame_in_h2o = h2o.H2OFrame( | |
# shuffling training dataset as well | |
frame_in.sample(frac=1, random_state=64).reset_index(drop=True), | |
column_types=COLUMN_TYPES) | |
print('H2O dataframe diagnostics:') | |
print(('types: %s' % frame_in_h2o.types)) | |
print(('row counnt: %d' % frame_in_h2o.nrow)) | |
print((len(frame_in_h2o))) | |
frame_in_h2o.head(rows=3) | |
version=0.01 | |
train_response = 'value' | |
drf_dx = h2o.estimators.H2ORandomForestEstimator( | |
# denoting update version name by epoch timestamp | |
model_id=f'model_v{str(version)}T{str(int(time.time()))}', | |
response_column=train_response, | |
# weights_column='weight', # Balance classes and observation weights are not currently supported together | |
ntrees=64, | |
nbins=32, | |
balance_classes=True, | |
binomial_double_trees=True) | |
print((drf_dx.model_id)) | |
print((drf_dx.__module__, drf_dx.__name__)) | |
train_u, val_u = frame_in_h2o.split_frame(ratios=[0.80]) | |
train_features = [label for label in COLUMN_TYPES if label != 'value'] | |
train_response = 'value' | |
print(train_features) | |
print(train_response) | |
display(train_u.head(rows=3)) | |
display(val_u.head(rows=3)) | |
max_train_time_hrs = 3 | |
drf_dx.train(x=train_features, y=train_response, | |
training_frame=train_u, validation_frame=val_u, | |
max_runtime_secs=max_train_time_hrs*60*60) | |
pprint.pprint(drf_dx.params['response_column']) | |
pprint.pprint(drf_dx.params) | |
print('\n\nParams...\n\n') | |
print((drf_dx.get_params)) | |
print((drf_dx.confusion_matrix(metrics=['f0point5', 'f1', 'f2', 'accuracy'], train=False, valid=True))) | |
# fitting on the data to save results | |
# predictions = drf_dx.predict(frame_in_h2o) | |
predictions = drf_dx.predict(val_u) | |
print(predictions.describe()) | |
# activation_thresh_target = 'accuracy' # most straight forward | |
activation_thresh_target = 'f0point5' # reduce FPs, more conservative (?) | |
# activation_thresh_target = 'f2' # reduce FNs, more aggressive (?) | |
activation_thresh = drf_dx.find_threshold_by_max_metric(activation_thresh_target, valid=True) | |
# in the case of this dataset, there are multiple classes as opposed to just binary, but this is just for demo purposes | |
neg_class, pos_class = frame_in_h2o['dx_outcome'].categories()[0], frame_in_h2o['dx_outcome'].categories()[-1] | |
print(f"Activation thresh: {activation_thresh}, neg class: {neg_class}, pos class: {pos_class}") | |
# TODO: fix, throws bytecode errors | |
# predictions['predict'] = predictions[pos_class].apply( | |
# lambda prob: pos_class if (prob >= activation_thresh) else neg_class) | |
# adjust for target threshold and convert class labels to numbers for final verification, | |
# since I think sklean.metrics can only work with number labels | |
predictions['predict'] = (predictions[pos_class] > activation_thresh).ifelse(pos_class, neg_class) | |
predictions['predicted_index'] = (predictions[pos_class] > activation_thresh).ifelse(1, 0) | |
predictions['true_label'] = val_u[train_response] | |
predictions['true_index'] = (predictions['true_label'] == pos_class).ifelse(1, 0) | |
display(predictions.head()) | |
frame_w_preds = val_u.concat(predictions, axis=1) | |
print(frame_w_preds.describe()) | |
predictions_df = frame_w_preds.as_data_frame() | |
except Exception as e: | |
print(e) | |
traceback.print_exc() | |
# TODO this does not actually fully kill the yarn application | |
# (need to do manually: yarn application -kill appication_<app ID>) | |
quit_program(t) | |
finally: | |
# TODO this does not actually fully kill the yarn application | |
# (need to do manually: yarn application -kill appication_<app ID>) | |
quit_program(t) | |
# double checking results on validation set | |
print('ROC AUC') | |
print(sklearn.metrics.roc_auc_score(y_true=predictions_df['true_index'], | |
y_score=predictions_df['predicted_index'])) | |
print('PR AUC') | |
print(sklearn.metrics.average_precision_score(y_true=predictions_df['true_index'], | |
y_score=predictions_df['predicted_index'], | |
pos_label=1)) | |
print('Accuracy') | |
print(sklearn.metrics.accuracy_score(y_true=predictions_df['true_index'], | |
y_pred=predictions_df['predicted_index'])) | |
print('F1') | |
print(sklearn.metrics.f1_score(y_true=predictions_df['true_index'], | |
y_pred=predictions_df['predicted_index'], | |
pos_label=1)) | |
print('Confusion Matrix') | |
print(sklearn.metrics.confusion_matrix(y_true=predictions_df['true_index'], | |
y_pred=predictions_df['predicted_index'])) | |
# predictions_df.loc[predictions_df[pos_class] >= activation_thresh, 'predict'] = pos_class | |
# predictions_df.loc[predictions_df[pos_class] < activation_thresh, 'predict'] = neg_class | |
display(predictions_df.head()) | |
now = datetime.datetime.now() | |
predictions_df.to_csv( | |
f"/path/to/my/project_home/data/eda.{activation_thresh_target.upper()}.{now.strftime('%Y%m%dT%H%M%S')}.TSV", | |
sep='\t', | |
header=True, | |
index=False | |
) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment