Created
August 17, 2020 06:15
-
-
Save lazykyama/6131cefefa127662c6751d1e8b92ed6e to your computer and use it in GitHub Desktop.
Reproduction code to verify an issue related to Spark 3.0 and Yosegi Spark.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# | |
# This script is a part of the example below. | |
# https://github.com/NVIDIA/spark-xgboost-examples/blob/51df0f3d77d6d6d5d0a8a3329258376b1d59eda5/datasets/ETL/MortgageETL.ipynb | |
# For convenience, this script is created from original notebook, separetely. | |
# The purpose of this script is to confirm a bug related to yosegi-spark (https://github.com/yahoojapan/yosegi-spark). | |
# | |
import datetime | |
import time | |
from pyspark import broadcast | |
from pyspark.sql import SparkSession | |
from pyspark.sql.functions import * | |
from pyspark.sql.types import * | |
from pyspark.sql.window import Window | |
spark = (SparkSession | |
.builder | |
.appName("MortgageETL") | |
.getOrCreate()) | |
# File schema | |
_csv_perf_schema = StructType([ | |
StructField('loan_id', LongType()), | |
StructField('monthly_reporting_period', StringType()), | |
StructField('servicer', StringType()), | |
StructField('interest_rate', DoubleType()), | |
StructField('current_actual_upb', DoubleType()), | |
StructField('loan_age', DoubleType()), | |
StructField('remaining_months_to_legal_maturity', DoubleType()), | |
StructField('adj_remaining_months_to_maturity', DoubleType()), | |
StructField('maturity_date', StringType()), | |
StructField('msa', DoubleType()), | |
StructField('current_loan_delinquency_status', IntegerType()), | |
StructField('mod_flag', StringType()), | |
StructField('zero_balance_code', StringType()), | |
StructField('zero_balance_effective_date', StringType()), | |
StructField('last_paid_installment_date', StringType()), | |
StructField('foreclosed_after', StringType()), | |
StructField('disposition_date', StringType()), | |
StructField('foreclosure_costs', DoubleType()), | |
StructField('prop_preservation_and_repair_costs', DoubleType()), | |
StructField('asset_recovery_costs', DoubleType()), | |
StructField('misc_holding_expenses', DoubleType()), | |
StructField('holding_taxes', DoubleType()), | |
StructField('net_sale_proceeds', DoubleType()), | |
StructField('credit_enhancement_proceeds', DoubleType()), | |
StructField('repurchase_make_whole_proceeds', StringType()), | |
StructField('other_foreclosure_proceeds', DoubleType()), | |
StructField('non_interest_bearing_upb', DoubleType()), | |
StructField('principal_forgiveness_upb', StringType()), | |
StructField('repurchase_make_whole_proceeds_flag', StringType()), | |
StructField('foreclosure_principal_write_off_amount', StringType()), | |
StructField('servicing_activity_indicator', StringType())]) | |
_csv_acq_schema = StructType([ | |
StructField('loan_id', LongType()), | |
StructField('orig_channel', StringType()), | |
StructField('seller_name', StringType()), | |
StructField('orig_interest_rate', DoubleType()), | |
StructField('orig_upb', IntegerType()), | |
StructField('orig_loan_term', IntegerType()), | |
StructField('orig_date', StringType()), | |
StructField('first_pay_date', StringType()), | |
StructField('orig_ltv', DoubleType()), | |
StructField('orig_cltv', DoubleType()), | |
StructField('num_borrowers', DoubleType()), | |
StructField('dti', DoubleType()), | |
StructField('borrower_credit_score', DoubleType()), | |
StructField('first_home_buyer', StringType()), | |
StructField('loan_purpose', StringType()), | |
StructField('property_type', StringType()), | |
StructField('num_units', IntegerType()), | |
StructField('occupancy_status', StringType()), | |
StructField('property_state', StringType()), | |
StructField('zip', IntegerType()), | |
StructField('mortgage_insurance_percent', DoubleType()), | |
StructField('product_type', StringType()), | |
StructField('coborrow_credit_score', DoubleType()), | |
StructField('mortgage_insurance_type', DoubleType()), | |
StructField('relocation_mortgage_indicator', StringType())]) | |
def _get_quarter_from_csv_file_name(): | |
return substring_index(substring_index(input_file_name(), '.', 1), '_', -1) | |
def read_perf_csv(spark, path): | |
return spark.read.format('csv') \ | |
.option('nullValue', '') \ | |
.option('header', 'false') \ | |
.option('delimiter', '|') \ | |
.schema(_csv_perf_schema) \ | |
.load(path) \ | |
.withColumn('quarter', _get_quarter_from_csv_file_name()) | |
def read_acq_csv(spark, path): | |
return spark.read.format('csv') \ | |
.option('nullValue', '') \ | |
.option('header', 'false') \ | |
.option('delimiter', '|') \ | |
.schema(_csv_acq_schema) \ | |
.load(path) \ | |
.withColumn('quarter', _get_quarter_from_csv_file_name()) | |
orig_acq_path = '/data/dataset/mortgage/mortgage_2000-2001/acq/Acquisition_2000Q1.txt' | |
orig_perf_path = '/data/dataset/mortgage/mortgage_2000-2001/perf/Performance_2000Q1.txt' | |
out_acq_parquet_path = '/data/dataset/mortgage/yosegi_repro/parquet/acq.parquet' | |
out_perf_parquet_path = '/data/dataset/mortgage/yosegi_repro/parquet/perf.parquet' | |
out_acq_yosegi_path = '/data/dataset/mortgage/yosegi_repro/yosegi/acq.yosegi' | |
out_perf_yosegi_path = '/data/dataset/mortgage/yosegi_repro/yosegi/perf.yosegi' | |
# read data and transcode to parquet / yosegi. | |
start = time.time() | |
acq = read_acq_csv(spark, orig_acq_path) | |
perf = read_perf_csv(spark, orig_perf_path) | |
end = time.time() | |
print(f'loading: {end - start}') | |
start = time.time() | |
acq.write.format('parquet').save(out_acq_parquet_path) | |
perf.write.format('parquet').save(out_perf_parquet_path) | |
end = time.time() | |
print(f'exporting to parquet: {end - start}') | |
start = time.time() | |
acq.write.format('jp.co.yahoo.yosegi.spark.YosegiFileFormat').save(out_acq_yosegi_path) | |
perf.write.format('jp.co.yahoo.yosegi.spark.YosegiFileFormat').save(out_perf_yosegi_path) | |
end = time.time() | |
print(f'exporting to yosegi: {end - start}') | |
spark.stop() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# | |
# This script is a part of the example below. | |
# https://github.com/NVIDIA/spark-xgboost-examples/blob/51df0f3d77d6d6d5d0a8a3329258376b1d59eda5/datasets/ETL/MortgageETL.ipynb | |
# For convenience, this script is created from original notebook, separetely. | |
# The purpose of this script is to confirm a bug related to yosegi-spark (https://github.com/yahoojapan/yosegi-spark). | |
# | |
import argparse | |
import datetime | |
import os | |
import time | |
from pyspark import broadcast | |
from pyspark.sql import SparkSession | |
from pyspark.sql.functions import * | |
from pyspark.sql.types import * | |
from pyspark.sql.window import Window | |
# name mappings | |
_name_mapping = [ | |
("WITMER FUNDING, LLC", "Witmer"), | |
("WELLS FARGO CREDIT RISK TRANSFER SECURITIES TRUST 2015", "Wells Fargo"), | |
("WELLS FARGO BANK, NA" , "Wells Fargo"), | |
("WELLS FARGO BANK, N.A." , "Wells Fargo"), | |
("WELLS FARGO BANK, NA" , "Wells Fargo"), | |
("USAA FEDERAL SAVINGS BANK" , "USAA"), | |
("UNITED SHORE FINANCIAL SERVICES, LLC D\\/B\\/A UNITED WHOLESALE MORTGAGE" , "United Seq(e"), | |
("U.S. BANK N.A." , "US Bank"), | |
("SUNTRUST MORTGAGE INC." , "Suntrust"), | |
("STONEGATE MORTGAGE CORPORATION" , "Stonegate Mortgage"), | |
("STEARNS LENDING, LLC" , "Stearns Lending"), | |
("STEARNS LENDING, INC." , "Stearns Lending"), | |
("SIERRA PACIFIC MORTGAGE COMPANY, INC." , "Sierra Pacific Mortgage"), | |
("REGIONS BANK" , "Regions"), | |
("RBC MORTGAGE COMPANY" , "RBC"), | |
("QUICKEN LOANS INC." , "Quicken Loans"), | |
("PULTE MORTGAGE, L.L.C." , "Pulte Mortgage"), | |
("PROVIDENT FUNDING ASSOCIATES, L.P." , "Provident Funding"), | |
("PROSPECT MORTGAGE, LLC" , "Prospect Mortgage"), | |
("PRINCIPAL RESIDENTIAL MORTGAGE CAPITAL RESOURCES, LLC" , "Principal Residential"), | |
("PNC BANK, N.A." , "PNC"), | |
("PMT CREDIT RISK TRANSFER TRUST 2015-2" , "PennyMac"), | |
("PHH MORTGAGE CORPORATION" , "PHH Mortgage"), | |
("PENNYMAC CORP." , "PennyMac"), | |
("PACIFIC UNION FINANCIAL, LLC" , "Other"), | |
("OTHER" , "Other"), | |
("NYCB MORTGAGE COMPANY, LLC" , "NYCB"), | |
("NEW YORK COMMUNITY BANK" , "NYCB"), | |
("NETBANK FUNDING SERVICES" , "Netbank"), | |
("NATIONSTAR MORTGAGE, LLC" , "Nationstar Mortgage"), | |
("METLIFE BANK, NA" , "Metlife"), | |
("LOANDEPOT.COM, LLC" , "LoanDepot.com"), | |
("J.P. MORGAN MADISON AVENUE SECURITIES TRUST, SERIES 2015-1" , "JP Morgan Chase"), | |
("J.P. MORGAN MADISON AVENUE SECURITIES TRUST, SERIES 2014-1" , "JP Morgan Chase"), | |
("JPMORGAN CHASE BANK, NATIONAL ASSOCIATION" , "JP Morgan Chase"), | |
("JPMORGAN CHASE BANK, NA" , "JP Morgan Chase"), | |
("JP MORGAN CHASE BANK, NA" , "JP Morgan Chase"), | |
("IRWIN MORTGAGE, CORPORATION" , "Irwin Mortgage"), | |
("IMPAC MORTGAGE CORP." , "Impac Mortgage"), | |
("HSBC BANK USA, NATIONAL ASSOCIATION" , "HSBC"), | |
("HOMEWARD RESIDENTIAL, INC." , "Homeward Mortgage"), | |
("HOMESTREET BANK" , "Other"), | |
("HOMEBRIDGE FINANCIAL SERVICES, INC." , "HomeBridge"), | |
("HARWOOD STREET FUNDING I, LLC" , "Harwood Mortgage"), | |
("GUILD MORTGAGE COMPANY" , "Guild Mortgage"), | |
("GMAC MORTGAGE, LLC (USAA FEDERAL SAVINGS BANK)" , "GMAC"), | |
("GMAC MORTGAGE, LLC" , "GMAC"), | |
("GMAC (USAA)" , "GMAC"), | |
("FREMONT BANK" , "Fremont Bank"), | |
("FREEDOM MORTGAGE CORP." , "Freedom Mortgage"), | |
("FRANKLIN AMERICAN MORTGAGE COMPANY" , "Franklin America"), | |
("FLEET NATIONAL BANK" , "Fleet National"), | |
("FLAGSTAR CAPITAL MARKETS CORPORATION" , "Flagstar Bank"), | |
("FLAGSTAR BANK, FSB" , "Flagstar Bank"), | |
("FIRST TENNESSEE BANK NATIONAL ASSOCIATION" , "Other"), | |
("FIFTH THIRD BANK" , "Fifth Third Bank"), | |
("FEDERAL HOME LOAN BANK OF CHICAGO" , "Fedral Home of Chicago"), | |
("FDIC, RECEIVER, INDYMAC FEDERAL BANK FSB" , "FDIC"), | |
("DOWNEY SAVINGS AND LOAN ASSOCIATION, F.A." , "Downey Mortgage"), | |
("DITECH FINANCIAL LLC" , "Ditech"), | |
("CITIMORTGAGE, INC." , "Citi"), | |
("CHICAGO MORTGAGE SOLUTIONS DBA INTERFIRST MORTGAGE COMPANY" , "Chicago Mortgage"), | |
("CHICAGO MORTGAGE SOLUTIONS DBA INTERBANK MORTGAGE COMPANY" , "Chicago Mortgage"), | |
("CHASE HOME FINANCE, LLC" , "JP Morgan Chase"), | |
("CHASE HOME FINANCE FRANKLIN AMERICAN MORTGAGE COMPANY" , "JP Morgan Chase"), | |
("CHASE HOME FINANCE (CIE 1)" , "JP Morgan Chase"), | |
("CHASE HOME FINANCE" , "JP Morgan Chase"), | |
("CASHCALL, INC." , "CashCall"), | |
("CAPITAL ONE, NATIONAL ASSOCIATION" , "Capital One"), | |
("CALIBER HOME LOANS, INC." , "Caliber Funding"), | |
("BISHOPS GATE RESIDENTIAL MORTGAGE TRUST" , "Bishops Gate Mortgage"), | |
("BANK OF AMERICA, N.A." , "Bank of America"), | |
("AMTRUST BANK" , "AmTrust"), | |
("AMERISAVE MORTGAGE CORPORATION" , "Amerisave"), | |
("AMERIHOME MORTGAGE COMPANY, LLC" , "AmeriHome Mortgage"), | |
("ALLY BANK" , "Ally Bank"), | |
("ACADEMY MORTGAGE CORPORATION" , "Academy Mortgage"), | |
("NO CASH-OUT REFINANCE" , "OTHER REFINANCE"), | |
("REFINANCE - NOT SPECIFIED" , "OTHER REFINANCE"), | |
("Other REFINANCE" , "OTHER REFINANCE")] | |
# String columns | |
cate_col_names = [ | |
"orig_channel", | |
"first_home_buyer", | |
"loan_purpose", | |
"property_type", | |
"occupancy_status", | |
"property_state", | |
"relocation_mortgage_indicator", | |
"seller_name", | |
"mod_flag" | |
] | |
# Numberic columns | |
label_col_name = "delinquency_12" | |
numeric_col_names = [ | |
"orig_interest_rate", | |
"orig_upb", | |
"orig_loan_term", | |
"orig_ltv", | |
"orig_cltv", | |
"num_borrowers", | |
"dti", | |
"borrower_credit_score", | |
"num_units", | |
"zip", | |
"mortgage_insurance_percent", | |
"current_loan_delinquency_status", | |
"current_actual_upb", | |
"interest_rate", | |
"loan_age", | |
"msa", | |
"non_interest_bearing_upb", | |
label_col_name | |
] | |
all_col_names = cate_col_names + numeric_col_names | |
def _parse_dates(perf): | |
return perf \ | |
.withColumn('monthly_reporting_period', to_date(col('monthly_reporting_period'), 'MM/dd/yyyy')) \ | |
.withColumn('monthly_reporting_period_month', month(col('monthly_reporting_period'))) \ | |
.withColumn('monthly_reporting_period_year', year(col('monthly_reporting_period'))) \ | |
.withColumn('monthly_reporting_period_day', dayofmonth(col('monthly_reporting_period'))) \ | |
.withColumn('last_paid_installment_date', to_date(col('last_paid_installment_date'), 'MM/dd/yyyy')) \ | |
.withColumn('foreclosed_after', to_date(col('foreclosed_after'), 'MM/dd/yyyy')) \ | |
.withColumn('disposition_date', to_date(col('disposition_date'), 'MM/dd/yyyy')) \ | |
.withColumn('maturity_date', to_date(col('maturity_date'), 'MM/yyyy')) \ | |
.withColumn('zero_balance_effective_date', to_date(col('zero_balance_effective_date'), 'MM/yyyy')) | |
def _create_perf_deliquency(spark, perf): | |
aggDF = perf.select( | |
col("quarter"), | |
col("loan_id"), | |
col("current_loan_delinquency_status"), | |
when(col("current_loan_delinquency_status") >= 1, col("monthly_reporting_period")).alias("delinquency_30"), | |
when(col("current_loan_delinquency_status") >= 3, col("monthly_reporting_period")).alias("delinquency_90"), | |
when(col("current_loan_delinquency_status") >= 6, col("monthly_reporting_period")).alias("delinquency_180")) \ | |
.groupBy("quarter", "loan_id") \ | |
.agg( | |
max("current_loan_delinquency_status").alias("delinquency_12"), | |
min("delinquency_30").alias("delinquency_30"), | |
min("delinquency_90").alias("delinquency_90"), | |
min("delinquency_180").alias("delinquency_180")) \ | |
.select( | |
col("quarter"), | |
col("loan_id"), | |
(col("delinquency_12") >= 1).alias("ever_30"), | |
(col("delinquency_12") >= 3).alias("ever_90"), | |
(col("delinquency_12") >= 6).alias("ever_180"), | |
col("delinquency_30"), | |
col("delinquency_90"), | |
col("delinquency_180")) | |
joinedDf = perf \ | |
.withColumnRenamed("monthly_reporting_period", "timestamp") \ | |
.withColumnRenamed("monthly_reporting_period_month", "timestamp_month") \ | |
.withColumnRenamed("monthly_reporting_period_year", "timestamp_year") \ | |
.withColumnRenamed("current_loan_delinquency_status", "delinquency_12") \ | |
.withColumnRenamed("current_actual_upb", "upb_12") \ | |
.select("quarter", "loan_id", "timestamp", "delinquency_12", "upb_12", "timestamp_month", "timestamp_year") \ | |
.join(aggDF, ["loan_id", "quarter"], "left_outer") | |
# calculate the 12 month delinquency and upb values | |
months = 12 | |
monthArray = [lit(x) for x in range(0, 12)] | |
# explode on a small amount of data is actually slightly more efficient than a cross join | |
testDf = joinedDf \ | |
.withColumn("month_y", explode(array(monthArray))) \ | |
.select( | |
col("quarter"), | |
floor(((col("timestamp_year") * 12 + col("timestamp_month")) - 24000) / months).alias("josh_mody"), | |
floor(((col("timestamp_year") * 12 + col("timestamp_month")) - 24000 - col("month_y")) / months).alias("josh_mody_n"), | |
col("ever_30"), | |
col("ever_90"), | |
col("ever_180"), | |
col("delinquency_30"), | |
col("delinquency_90"), | |
col("delinquency_180"), | |
col("loan_id"), | |
col("month_y"), | |
col("delinquency_12"), | |
col("upb_12")) \ | |
.groupBy("quarter", "loan_id", "josh_mody_n", "ever_30", "ever_90", "ever_180", "delinquency_30", "delinquency_90", "delinquency_180", "month_y") \ | |
.agg(max("delinquency_12").alias("delinquency_12"), min("upb_12").alias("upb_12")) \ | |
.withColumn("timestamp_year", floor((lit(24000) + (col("josh_mody_n") * lit(months)) + (col("month_y") - 1)) / lit(12))) \ | |
.selectExpr('*', 'pmod(24000 + (josh_mody_n * {}) + month_y, 12) as timestamp_month_tmp'.format(months)) \ | |
.withColumn("timestamp_month", when(col("timestamp_month_tmp") == lit(0), lit(12)).otherwise(col("timestamp_month_tmp"))) \ | |
.withColumn("delinquency_12", ((col("delinquency_12") > 3).cast("int") + (col("upb_12") == 0).cast("int")).alias("delinquency_12")) \ | |
.drop("timestamp_month_tmp", "josh_mody_n", "month_y") | |
return perf.withColumnRenamed("monthly_reporting_period_month", "timestamp_month") \ | |
.withColumnRenamed("monthly_reporting_period_year", "timestamp_year") \ | |
.join(testDf, ["quarter", "loan_id", "timestamp_year", "timestamp_month"], "left") \ | |
.drop("timestamp_year", "timestamp_month") | |
def _create_acquisition(spark, acq): | |
nameMapping = spark.createDataFrame(_name_mapping, ["from_seller_name", "to_seller_name"]) | |
return acq.join(nameMapping, col("seller_name") == col("from_seller_name"), "left") \ | |
.drop("from_seller_name") \ | |
.withColumn("old_name", col("seller_name")) \ | |
.withColumn("seller_name", coalesce(col("to_seller_name"), col("seller_name"))) \ | |
.drop("to_seller_name") \ | |
.withColumn("orig_date", to_date(col("orig_date"), "MM/yyyy")) \ | |
.withColumn("first_pay_date", to_date(col("first_pay_date"), "MM/yyyy")) | |
def _gen_dictionary(etl_df, col_names): | |
cnt_table = etl_df.select(posexplode(array([col(i) for i in col_names])))\ | |
.withColumnRenamed("pos", "column_id")\ | |
.withColumnRenamed("col", "data")\ | |
.filter("data is not null")\ | |
.groupBy("column_id", "data")\ | |
.count() | |
windowed = Window.partitionBy("column_id").orderBy(desc("count")) | |
return cnt_table.withColumn("id", row_number().over(windowed)).drop("count") | |
def _cast_string_columns_to_numeric(spark, input_df): | |
cached_dict_df = _gen_dictionary(input_df, cate_col_names).cache() | |
output_df = input_df | |
# Generate the final table with all columns being numeric. | |
for col_pos, col_name in enumerate(cate_col_names): | |
col_dict_df = cached_dict_df.filter(col("column_id") == col_pos)\ | |
.drop("column_id")\ | |
.withColumnRenamed("data", col_name) | |
output_df = output_df.join(broadcast(col_dict_df), col_name, "left")\ | |
.drop(col_name)\ | |
.withColumnRenamed("id", col_name) | |
return output_df | |
def run_mortgage(spark, perf, acq, stop_part): | |
parsed_perf = _parse_dates(perf) | |
perf_deliqency = _create_perf_deliquency(spark, parsed_perf) | |
cleaned_acq = _create_acquisition(spark, acq) | |
clean_df = perf_deliqency.join(cleaned_acq, ["loan_id", "quarter"], "inner").drop("quarter") | |
if stop_part: | |
return clean_df | |
else: | |
casted_clean_df = _cast_string_columns_to_numeric(spark, clean_df)\ | |
.select(all_col_names)\ | |
.withColumn(label_col_name, when(col(label_col_name) > 0, 1).otherwise(0))\ | |
.fillna(float(0)) | |
return casted_clean_df | |
def main(): | |
parser = argparse.ArgumentParser() | |
parser.add_argument('--part', action='store_true', default=False) | |
parser.add_argument( | |
'--format', | |
type=str, | |
choices=['parquet', 'yosegi'], | |
default='parquet') | |
args = parser.parse_args() | |
print(args) | |
spark = (SparkSession | |
.builder | |
.appName('MortgageETL') | |
.getOrCreate()) | |
if args.format == 'parquet': | |
out_type = 'parquet' | |
reader_format = 'parquet' | |
else: | |
out_type = 'yosegi' | |
reader_format = 'jp.co.yahoo.yosegi.spark.YosegiFileFormat' | |
input_dir = os.path.join('/data/dataset/mortgage/yosegi_repro/', out_type) | |
result_count = [] | |
perf_path = os.path.join(input_dir, f'perf.{out_type}') | |
acq_path = os.path.join(input_dir, f'acq.{out_type}') | |
print(f'perf_path = {perf_path}') | |
print(f'acq_path = {acq_path}') | |
# read parquet/yosegi. | |
perf = spark.read.format(reader_format).load(perf_path) | |
acq = spark.read.format(reader_format).load(acq_path) | |
# run main function to process data | |
out = run_mortgage(spark, perf, acq, args.part) | |
out.show(10) | |
out.write.parquet( | |
f'/data/dataset/mortgage/yosegi_repro/tmp/spark.{out_type}', | |
mode='overwrite') | |
spark.stop() | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment