Skip to content

Instantly share code, notes, and snippets.

@ayushdg
Last active October 16, 2019 20:58
Show Gist options
  • Save ayushdg/ccd723d3ba9dd1aca98860787d1e86ce to your computer and use it in GitHub Desktop.
Save ayushdg/ccd723d3ba9dd1aca98860787d1e86ce to your computer and use it in GitHub Desktop.
import dask_xgboost as dxgb_gpu
import dask
import dask_cudf
from dask_cuda import LocalCUDACluster
from dask.delayed import delayed
from dask.distributed import Client, wait
import cudf
import numpy as np
import xgboost as xgb
from collections import OrderedDict
import gc
from glob import glob
import os
import subprocess
import time
def initialize_rmm_pool():
if "0.10" in cudf.__version__:
import rmm
from rmm import rmm_config as rmm_cfg
else:
from librmm_cffi import librmm as rmm, librmm_config as rmm_cfg
rmm.finalize()
rmm_cfg.use_pool_allocator = True
#rmm_cfg.initial_pool_size = 16 << 30 (Default is 1/2 of gpu mem)
rmm.initialize()
def initialize_rmm_no_pool():
if "0.10" in cudf.__version__:
import rmm
from rmm import rmm_config as rmm_cfg
else:
from librmm_cffi import librmm as rmm, librmm_config as rmm_cfg
rmm_cfg.use_pool_allocator = False
rmm.initialize()
def finalize_rmm():
if "0.10" in cudf.__version__:
import rmm
else:
from librmm_cffi import librmm as rmm
rmm.finalize()
def run_dask_task(func, **kwargs):
task = func(**kwargs)
return task
def process_quarter_gpu(year=2000, quarter=1, perf_file="", data_dir = "", client = None, **kwargs):
ml_arrays = run_dask_task(delayed(run_gpu_workflow),
quarter=quarter,
year=year,
perf_file=perf_file)
return client.compute(ml_arrays,
optimize_graph=False,
fifo_timeout="0ms")
def null_workaround(df, **kwargs):
for column, data_type in df.dtypes.items():
if str(data_type) == "category":
df[column] = df[column].astype('int32').fillna(-1)
if str(data_type) in ['int8', 'int16', 'int32', 'int64', 'float32', 'float64']:
df[column] = df[column].fillna(np.dtype(data_type).type(-1))
return df
def run_gpu_workflow(quarter=1, year=2000, perf_file="", acq_file="", names_file="", **kwargs):
names = gpu_load_names(col_names_path = data_dir + "names.csv")
acq_gdf = gpu_load_acquisition_csv(acquisition_path= data_dir + "acq" + "/Acquisition_"
+ str(year) + "Q" + str(quarter) + ".txt")
acq_gdf = acq_gdf.merge(names, how='left', on=['seller_name'])
acq_gdf.drop_column('seller_name')
acq_gdf['seller_name'] = acq_gdf['new']
acq_gdf.drop_column('new')
perf_df_tmp = gpu_load_performance_csv(perf_file)
gdf = perf_df_tmp
everdf = create_ever_features(gdf)
delinq_merge = create_delinq_features(gdf)
everdf = join_ever_delinq_features(everdf, delinq_merge)
del(delinq_merge)
joined_df = create_joined_df(gdf, everdf)
testdf = create_12_mon_features(joined_df)
joined_df = combine_joined_12_mon(joined_df, testdf)
del(testdf)
perf_df = final_performance_delinquency(gdf, joined_df)
del(gdf, joined_df)
final_gdf = join_perf_acq_gdfs(perf_df, acq_gdf)
del(perf_df)
del(acq_gdf)
final_gdf = last_mile_cleaning(final_gdf)
return final_gdf
def gpu_load_performance_csv(performance_path, **kwargs):
""" Loads performance data
Returns
-------
GPU DataFrame
"""
cols = [
"loan_id", "monthly_reporting_period", "servicer", "interest_rate", "current_actual_upb",
"loan_age", "remaining_months_to_legal_maturity", "adj_remaining_months_to_maturity",
"maturity_date", "msa", "current_loan_delinquency_status", "mod_flag", "zero_balance_code",
"zero_balance_effective_date", "last_paid_installment_date", "foreclosed_after",
"disposition_date", "foreclosure_costs", "prop_preservation_and_repair_costs",
"asset_recovery_costs", "misc_holding_expenses", "holding_taxes", "net_sale_proceeds",
"credit_enhancement_proceeds", "repurchase_make_whole_proceeds", "other_foreclosure_proceeds",
"non_interest_bearing_upb", "principal_forgiveness_upb", "repurchase_make_whole_proceeds_flag",
"foreclosure_principal_write_off_amount", "servicing_activity_indicator"
]
dtypes = OrderedDict([
("loan_id", "int64"),
("monthly_reporting_period", "date"),
("servicer", "category"),
("interest_rate", "float64"),
("current_actual_upb", "float64"),
("loan_age", "float64"),
("remaining_months_to_legal_maturity", "float64"),
("adj_remaining_months_to_maturity", "float64"),
("maturity_date", "date"),
("msa", "float64"),
("current_loan_delinquency_status", "int32"),
("mod_flag", "category"),
("zero_balance_code", "category"),
("zero_balance_effective_date", "date"),
("last_paid_installment_date", "date"),
("foreclosed_after", "date"),
("disposition_date", "date"),
("foreclosure_costs", "float64"),
("prop_preservation_and_repair_costs", "float64"),
("asset_recovery_costs", "float64"),
("misc_holding_expenses", "float64"),
("holding_taxes", "float64"),
("net_sale_proceeds", "float64"),
("credit_enhancement_proceeds", "float64"),
("repurchase_make_whole_proceeds", "float64"),
("other_foreclosure_proceeds", "float64"),
("non_interest_bearing_upb", "float64"),
("principal_forgiveness_upb", "float64"),
("repurchase_make_whole_proceeds_flag", "category"),
("foreclosure_principal_write_off_amount", "float64"),
("servicing_activity_indicator", "category")
])
return cudf.read_csv(performance_path, names=cols, delimiter='|', dtype=list(dtypes.values()), skiprows=1)
def gpu_load_acquisition_csv(acquisition_path, **kwargs):
""" Loads acquisition data
Returns
-------
GPU DataFrame
"""
cols = [
'loan_id', 'orig_channel', 'seller_name', 'orig_interest_rate', 'orig_upb', 'orig_loan_term',
'orig_date', 'first_pay_date', 'orig_ltv', 'orig_cltv', 'num_borrowers', 'dti', 'borrower_credit_score',
'first_home_buyer', 'loan_purpose', 'property_type', 'num_units', 'occupancy_status', 'property_state',
'zip', 'mortgage_insurance_percent', 'product_type', 'coborrow_credit_score', 'mortgage_insurance_type',
'relocation_mortgage_indicator'
]
dtypes = OrderedDict([
("loan_id", "int64"),
("orig_channel", "category"),
("seller_name", "category"),
("orig_interest_rate", "float64"),
("orig_upb", "int64"),
("orig_loan_term", "int64"),
("orig_date", "date"),
("first_pay_date", "date"),
("orig_ltv", "float64"),
("orig_cltv", "float64"),
("num_borrowers", "float64"),
("dti", "float64"),
("borrower_credit_score", "float64"),
("first_home_buyer", "category"),
("loan_purpose", "category"),
("property_type", "category"),
("num_units", "int64"),
("occupancy_status", "category"),
("property_state", "category"),
("zip", "int64"),
("mortgage_insurance_percent", "float64"),
("product_type", "category"),
("coborrow_credit_score", "float64"),
("mortgage_insurance_type", "float64"),
("relocation_mortgage_indicator", "category")
])
return cudf.read_csv(acquisition_path, names=cols, delimiter='|', dtype=list(dtypes.values()), skiprows=1)
def gpu_load_names(col_names_path = "", **kwargs):
""" Loads names used for renaming the banks
Returns
-------
GPU DataFrame
"""
cols = [
'seller_name', 'new'
]
dtypes = OrderedDict([
("seller_name", "category"),
("new", "category"),
])
return cudf.read_csv(col_names_path, names=cols, delimiter='|', dtype=list(dtypes.values()), skiprows=1)
def create_ever_features(gdf, **kwargs):
everdf = gdf[['loan_id', 'current_loan_delinquency_status']]
everdf = everdf.groupby('loan_id', method='hash', as_index=False).max()
del(gdf)
everdf['ever_30'] = (everdf['current_loan_delinquency_status'] >= 1).astype('int8')
everdf['ever_90'] = (everdf['current_loan_delinquency_status'] >= 3).astype('int8')
everdf['ever_180'] = (everdf['current_loan_delinquency_status'] >= 6).astype('int8')
everdf.drop_column('current_loan_delinquency_status')
return everdf
def create_delinq_features(gdf, **kwargs):
delinq_gdf = gdf[['loan_id', 'monthly_reporting_period', 'current_loan_delinquency_status']]
del(gdf)
delinq_30 = delinq_gdf.query('current_loan_delinquency_status >= 1')[['loan_id', 'monthly_reporting_period']].groupby('loan_id', method='hash', as_index=False).min()
delinq_30['delinquency_30'] = delinq_30['monthly_reporting_period']
delinq_30.drop_column('monthly_reporting_period')
delinq_90 = delinq_gdf.query('current_loan_delinquency_status >= 3')[['loan_id', 'monthly_reporting_period']].groupby('loan_id', method='hash', as_index=False).min()
delinq_90['delinquency_90'] = delinq_90['monthly_reporting_period']
delinq_90.drop_column('monthly_reporting_period')
delinq_180 = delinq_gdf.query('current_loan_delinquency_status >= 6')[['loan_id', 'monthly_reporting_period']].groupby('loan_id', method='hash', as_index=False).min()
delinq_180['delinquency_180'] = delinq_180['monthly_reporting_period']
delinq_180.drop_column('monthly_reporting_period')
del(delinq_gdf)
delinq_merge = delinq_30.merge(delinq_90, how='left', on=['loan_id'], type='hash')
delinq_merge['delinquency_90'] = delinq_merge['delinquency_90'].fillna(np.dtype('datetime64[ms]').type('1970-01-01').astype('datetime64[ms]'))
delinq_merge = delinq_merge.merge(delinq_180, how='left', on=['loan_id'], type='hash')
delinq_merge['delinquency_180'] = delinq_merge['delinquency_180'].fillna(np.dtype('datetime64[ms]').type('1970-01-01').astype('datetime64[ms]'))
del(delinq_30)
del(delinq_90)
del(delinq_180)
return delinq_merge
def join_ever_delinq_features(everdf_tmp, delinq_merge, **kwargs):
everdf = everdf_tmp.merge(delinq_merge, on=['loan_id'], how='left', type='hash')
del(everdf_tmp)
del(delinq_merge)
everdf['delinquency_30'] = everdf['delinquency_30'].fillna(np.dtype('datetime64[ms]').type('1970-01-01').astype('datetime64[ms]'))
everdf['delinquency_90'] = everdf['delinquency_90'].fillna(np.dtype('datetime64[ms]').type('1970-01-01').astype('datetime64[ms]'))
everdf['delinquency_180'] = everdf['delinquency_180'].fillna(np.dtype('datetime64[ms]').type('1970-01-01').astype('datetime64[ms]'))
return everdf
def create_joined_df(gdf, everdf, **kwargs):
test = gdf[['loan_id', 'monthly_reporting_period', 'current_loan_delinquency_status', 'current_actual_upb']]
del(gdf)
test['timestamp'] = test['monthly_reporting_period']
test.drop_column('monthly_reporting_period')
test['timestamp_month'] = test['timestamp'].dt.month
test['timestamp_year'] = test['timestamp'].dt.year
test['delinquency_12'] = test['current_loan_delinquency_status']
test.drop_column('current_loan_delinquency_status')
test['upb_12'] = test['current_actual_upb']
test.drop_column('current_actual_upb')
test['upb_12'] = test['upb_12'].fillna(999999999)
test['delinquency_12'] = test['delinquency_12'].fillna(-1)
joined_df = test.merge(everdf, how='left', on=['loan_id'], type='hash')
del(everdf)
del(test)
joined_df['ever_30'] = joined_df['ever_30'].fillna(-1)
joined_df['ever_90'] = joined_df['ever_90'].fillna(-1)
joined_df['ever_180'] = joined_df['ever_180'].fillna(-1)
joined_df['delinquency_30'] = joined_df['delinquency_30'].fillna(-1)
joined_df['delinquency_90'] = joined_df['delinquency_90'].fillna(-1)
joined_df['delinquency_180'] = joined_df['delinquency_180'].fillna(-1)
joined_df['timestamp_year'] = joined_df['timestamp_year'].astype('int32')
joined_df['timestamp_month'] = joined_df['timestamp_month'].astype('int32')
return joined_df
def create_12_mon_features(joined_df, **kwargs):
testdfs = []
n_months = 12
for y in range(1, n_months + 1):
tmpdf = joined_df[['loan_id', 'timestamp_year', 'timestamp_month', 'delinquency_12', 'upb_12']]
tmpdf['josh_months'] = tmpdf['timestamp_year'] * 12 + tmpdf['timestamp_month']
tmpdf['josh_mody_n'] = ((tmpdf['josh_months'].astype('float64') - 24000 - y) / 12).floor()
tmpdf = tmpdf.groupby(['loan_id', 'josh_mody_n'], method='hash', as_index=False).agg({'delinquency_12': 'max','upb_12': 'min'})
tmpdf['delinquency_12'] = (tmpdf['delinquency_12']>3).astype('int32')
tmpdf['delinquency_12'] +=(tmpdf['upb_12']==0).astype('int32')
tmpdf['timestamp_year'] = (((tmpdf['josh_mody_n'] * n_months) + 24000 + (y - 1)) / 12).floor().astype('int16')
tmpdf['timestamp_month'] = np.int8(y)
tmpdf.drop_column('josh_mody_n')
testdfs.append(tmpdf)
del(tmpdf)
del(joined_df)
return cudf.concat(testdfs)
def combine_joined_12_mon(joined_df, testdf, **kwargs):
joined_df.drop_column('delinquency_12')
joined_df.drop_column('upb_12')
joined_df['timestamp_year'] = joined_df['timestamp_year'].astype('int16')
joined_df['timestamp_month'] = joined_df['timestamp_month'].astype('int8')
return joined_df.merge(testdf, how='left', on=['loan_id', 'timestamp_year', 'timestamp_month'], type='hash')
def final_performance_delinquency(gdf, joined_df, **kwargs):
merged = null_workaround(gdf)
#merged = gdf
joined_df = null_workaround(joined_df)
joined_df['timestamp_month'] = joined_df['timestamp_month'].astype('int8')
joined_df['timestamp_year'] = joined_df['timestamp_year'].astype('int16')
merged['timestamp_month'] = merged['monthly_reporting_period'].dt.month
merged['timestamp_month'] = merged['timestamp_month'].astype('int8')
merged['timestamp_year'] = merged['monthly_reporting_period'].dt.year
merged['timestamp_year'] = merged['timestamp_year'].astype('int16')
merged = merged.merge(joined_df, how='left', on=['loan_id', 'timestamp_year', 'timestamp_month'], type='hash')
merged.drop_column('timestamp_year')
merged.drop_column('timestamp_month')
return merged
def join_perf_acq_gdfs(perf, acq, **kwargs):
perf = null_workaround(perf)
acq = null_workaround(acq)
return perf.merge(acq, how='left', on=['loan_id'], type='hash')
def last_mile_cleaning(df, **kwargs):
drop_list = [
'loan_id', 'orig_date', 'first_pay_date', 'seller_name',
'monthly_reporting_period', 'last_paid_installment_date', 'maturity_date', 'ever_30', 'ever_90', 'ever_180',
'delinquency_30', 'delinquency_90', 'delinquency_180', 'upb_12',
'zero_balance_effective_date','foreclosed_after', 'disposition_date','timestamp'
]
for column in drop_list:
df.drop_column(column)
for col, dtype in df.dtypes.iteritems():
if str(dtype)=='category':
df[col] = df[col].cat.codes
df[col] = df[col].astype('float32')
df['delinquency_12'] = df['delinquency_12'] > 0
df['delinquency_12'] = df['delinquency_12'].fillna(False).astype('int32')
for column in df.columns:
df[column] = df[column].fillna(np.dtype(str(df[column].dtype)).type(-1))
return df.to_arrow(preserve_index=False)
def xgb_training(part_count, gpu_dfs, client = None):
dxgb_gpu_params = {
'nround': 100,
'max_depth': 8,
'max_leaves': 2**8,
'alpha': 0.9,
'eta': 0.1,
'gamma': 0.1,
'learning_rate': 0.1,
'subsample': 1,
'reg_lambda': 1,
'scale_pos_weight': 2,
'min_child_weight': 30,
'tree_method': 'gpu_hist',
'loss': 'ls',
'objective': 'binary:logistic',
'max_features': 'auto',
'criterion': 'friedman_mse',
'grow_policy': 'lossguide',
'verbose': True
}
print(f"Preparing data for training with part count: {part_count}")
t1 = time.time()
gpu_dfs = [delayed(cudf.DataFrame.from_arrow)(gpu_df) for gpu_df in gpu_dfs[:part_count]]
gpu_dfs = [gpu_df for gpu_df in gpu_dfs]
wait(gpu_dfs)
tmp_map = [(gpu_df, list(client.who_has(gpu_df).values())[0]) for gpu_df in gpu_dfs]
new_map = {}
for key, value in tmp_map:
if value not in new_map:
new_map[value] = [key]
else:
new_map[value].append(key)
del(tmp_map)
gpu_dfs = []
for list_delayed in new_map.values():
gpu_dfs.append(delayed(cudf.concat)(list_delayed))
del(new_map)
gpu_dfs = [(gpu_df[['delinquency_12']], gpu_df[delayed(list)(gpu_df.columns.difference(['delinquency_12']))]) for gpu_df in gpu_dfs]
gpu_dfs = [(gpu_df[0].persist(), gpu_df[1].persist()) for gpu_df in gpu_dfs]
gpu_dfs = [dask.delayed(xgb.DMatrix)(gpu_df[1], gpu_df[0]) for gpu_df in gpu_dfs]
gpu_dfs = [gpu_df.persist() for gpu_df in gpu_dfs]
gc.collect()
wait(gpu_dfs)
print(f"Time taken to prepare data for XGB training {time.time()-t1} s")
print("Training model")
t1 = time.time()
labels = None
print("XGB training for part_count:{}".format(part_count))
bst = dxgb_gpu.train(client, dxgb_gpu_params, gpu_dfs, labels, num_boost_round=dxgb_gpu_params['nround'])
print(f"Time taken to train XGB model {time.time()-t1} s")
return bst
def run_etl(start_year, end_year, data_dir, client):
print("Starting ETL")
t1 = time.time()
perf_data_path = data_dir + "perf/"
gpu_dfs = []
gpu_time = 0
quarter = 1
year = start_year
count = 0
while year <= end_year:
for file in glob(os.path.join(perf_data_path + "/Performance_" + str(year) + "Q" + str(quarter) + "*")):
gpu_dfs.append(process_quarter_gpu(year=year, quarter=quarter, perf_file=file, client=client))
count += 1
quarter += 1
if quarter == 5:
year += 1
quarter = 1
print("ETL for start_year:{} and end_year:{}\n".format(start_year,end_year))
wait(gpu_dfs)
print(f"Time taken to run ETL from {start_year} to {end_year} was {time.time()-t1} s")
return gpu_dfs
def set_NCCL_variable():
os.environ["NCCL_P2P_DISABLE"] = "1"
if __name__ == "__main__":
cmd = "hostname --all-ip-addresses"
process = subprocess.Popen(cmd.split(), stdout=subprocess.PIPE)
output, error = process.communicate()
IPADDR = str(output.decode()).split()[0]
cluster = LocalCUDACluster(ip=IPADDR, local_directory="/raid/dask-space/")
client = Client(cluster)
#client = Client(IPADDR+":8786")
client.run(set_NCCL_variable)
data_dir = "/raid/mortgage2000-2016_1gb/"
#data_dir = "/raid/mortgage_unsplit/"
start_year = 2000
end_year = 2016
part_count = 48
dask.config.set({'distributed.scheduler.work-stealing': False})
dask.config.get('distributed.scheduler.work-stealing')
dask.config.set({'distributed.scheduler.bandwidth': 1})
dask.config.get('distributed.scheduler.bandwidth')
client.run(initialize_rmm_pool)
etl_result = run_etl(start_year, end_year, data_dir, client)
client.run(finalize_rmm)
client.run(initialize_rmm_no_pool)
model = xgb_training(part_count, etl_result, client)
client.close()
cluster.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment