Created
July 13, 2020 04:21
-
-
Save lazykyama/1b6831d4b7b6381ed4e2c9348f55aa5d to your computer and use it in GitHub Desktop.
Spark 3.0 with GPU issue repro code.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// This code is based on the example below. | |
// https://github.com/rapidsai/spark-examples/blob/master/datasets/ETL/MortgageETL.ipynb | |
import org.apache.spark.sql.functions._ | |
import org.apache.spark.sql.types._ | |
import org.apache.spark.sql.{DataFrame, SparkSession} | |
object ReadPerformanceCsv { | |
def apply(spark: SparkSession, path: String): DataFrame = { | |
val performanceSchema = StructType(Array( | |
StructField("loan_id", LongType), | |
StructField("monthly_reporting_period", StringType), | |
StructField("servicer", StringType), | |
StructField("interest_rate", DoubleType), | |
StructField("current_actual_upb", DoubleType), | |
StructField("loan_age", DoubleType), | |
StructField("remaining_months_to_legal_maturity", DoubleType), | |
StructField("adj_remaining_months_to_maturity", DoubleType), | |
StructField("maturity_date", StringType), | |
StructField("msa", DoubleType), | |
StructField("current_loan_delinquency_status", IntegerType), | |
StructField("mod_flag", StringType), | |
StructField("zero_balance_code", StringType), | |
StructField("zero_balance_effective_date", StringType), | |
StructField("last_paid_installment_date", StringType), | |
StructField("foreclosed_after", StringType), | |
StructField("disposition_date", StringType), | |
StructField("foreclosure_costs", DoubleType), | |
StructField("prop_preservation_and_repair_costs", DoubleType), | |
StructField("asset_recovery_costs", DoubleType), | |
StructField("misc_holding_expenses", DoubleType), | |
StructField("holding_taxes", DoubleType), | |
StructField("net_sale_proceeds", DoubleType), | |
StructField("credit_enhancement_proceeds", DoubleType), | |
StructField("repurchase_make_whole_proceeds", StringType), | |
StructField("other_foreclosure_proceeds", DoubleType), | |
StructField("non_interest_bearing_upb", DoubleType), | |
StructField("principal_forgiveness_upb", StringType), | |
StructField("repurchase_make_whole_proceeds_flag", StringType), | |
StructField("foreclosure_principal_write_off_amount", StringType), | |
StructField("servicing_activity_indicator", StringType)) | |
) | |
val udf = spark.udf.register("get_quarter", (path: String) => { | |
path.split("\\.").head.split("_").last | |
}) | |
spark.read.format("csv") | |
.option("nullValue", "") | |
.option("header", "false") | |
.option("delimiter", "|") | |
.option("parserLib", "univocity") | |
.schema(performanceSchema) | |
.load(path) | |
.withColumn("quarter", udf(input_file_name())) | |
} | |
} | |
object ReadAcquisitionCsv { | |
def apply(spark: SparkSession, path: String): DataFrame = { | |
val acquisitionSchema = StructType(Array( | |
StructField("loan_id", LongType), | |
StructField("orig_channel", StringType), | |
StructField("seller_name", StringType), | |
StructField("orig_interest_rate", DoubleType), | |
StructField("orig_upb", IntegerType), | |
StructField("orig_loan_term", IntegerType), | |
StructField("orig_date", StringType), | |
StructField("first_pay_date", StringType), | |
StructField("orig_ltv", DoubleType), | |
StructField("orig_cltv", DoubleType), | |
StructField("num_borrowers", DoubleType), | |
StructField("dti", DoubleType), | |
StructField("borrower_credit_score", DoubleType), | |
StructField("first_home_buyer", StringType), | |
StructField("loan_purpose", StringType), | |
StructField("property_type", StringType), | |
StructField("num_units", IntegerType), | |
StructField("occupancy_status", StringType), | |
StructField("property_state", StringType), | |
StructField("zip", IntegerType), | |
StructField("mortgage_insurance_percent", DoubleType), | |
StructField("product_type", StringType), | |
StructField("coborrow_credit_score", DoubleType), | |
StructField("mortgage_insurance_type", DoubleType), | |
StructField("relocation_mortgage_indicator", StringType)) | |
) | |
val udf = spark.udf.register("get_quarter", (path: String) => { | |
path.split("\\.").head.split("_").last | |
}) | |
spark.read.format("csv") | |
.option("header", "false") | |
.option("delimiter", "|") | |
.schema(acquisitionSchema) | |
.load(path) | |
.withColumn("quarter", udf(input_file_name())) | |
} | |
} | |
val suffix = "_13k" | |
val inputDir = "/data/dataset/mortgage/repro_minimal/org" | |
val outputDir = "/data/dataset/mortgage/repro_minimal/parquet" | |
val perfPath = inputDir + "/Performance_2000Q1" + suffix + ".txt" | |
println("perfPath: " + perfPath) | |
val perfCsv = ReadPerformanceCsv(spark, perfPath) | |
println("#records of perfCsv: " + perfCsv.count()) | |
// Note: 16gb memory is required for driver (and executor?) memory. | |
val outPerfPath = outputDir + "/Performance_2000Q1" + suffix + ".parquet" | |
perfCsv.write.format("parquet").save(outPerfPath) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// This code is based on the example below. | |
// https://github.com/rapidsai/spark-examples/blob/master/datasets/ETL/MortgageETL.ipynb | |
import org.apache.spark.sql.functions._ | |
import org.apache.spark.sql.types._ | |
import org.apache.spark.sql.{DataFrame, SparkSession} | |
import org.apache.spark.storage.StorageLevel | |
val storageLevel = StorageLevel.MEMORY_ONLY | |
val suffix = "_13k" | |
val inputDir = "/data/dataset/mortgage/repro_minimal/parquet" | |
val perfPath = inputDir + "/Performance_2000Q1" + suffix + ".parquet" | |
println("perfPath: " + perfPath) | |
val perfCsv = spark.read.format("parquet").load(perfPath).persist(storageLevel) | |
perfCsv.show(10) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment