Skip to content

Instantly share code, notes, and snippets.

@lazykyama
Created July 13, 2020 04:21
Show Gist options
  • Save lazykyama/1b6831d4b7b6381ed4e2c9348f55aa5d to your computer and use it in GitHub Desktop.
Save lazykyama/1b6831d4b7b6381ed4e2c9348f55aa5d to your computer and use it in GitHub Desktop.
Spark 3.0 with GPU issue repro code.
// This code is based on the example below.
// https://github.com/rapidsai/spark-examples/blob/master/datasets/ETL/MortgageETL.ipynb
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.sql.{DataFrame, SparkSession}
object ReadPerformanceCsv {
def apply(spark: SparkSession, path: String): DataFrame = {
val performanceSchema = StructType(Array(
StructField("loan_id", LongType),
StructField("monthly_reporting_period", StringType),
StructField("servicer", StringType),
StructField("interest_rate", DoubleType),
StructField("current_actual_upb", DoubleType),
StructField("loan_age", DoubleType),
StructField("remaining_months_to_legal_maturity", DoubleType),
StructField("adj_remaining_months_to_maturity", DoubleType),
StructField("maturity_date", StringType),
StructField("msa", DoubleType),
StructField("current_loan_delinquency_status", IntegerType),
StructField("mod_flag", StringType),
StructField("zero_balance_code", StringType),
StructField("zero_balance_effective_date", StringType),
StructField("last_paid_installment_date", StringType),
StructField("foreclosed_after", StringType),
StructField("disposition_date", StringType),
StructField("foreclosure_costs", DoubleType),
StructField("prop_preservation_and_repair_costs", DoubleType),
StructField("asset_recovery_costs", DoubleType),
StructField("misc_holding_expenses", DoubleType),
StructField("holding_taxes", DoubleType),
StructField("net_sale_proceeds", DoubleType),
StructField("credit_enhancement_proceeds", DoubleType),
StructField("repurchase_make_whole_proceeds", StringType),
StructField("other_foreclosure_proceeds", DoubleType),
StructField("non_interest_bearing_upb", DoubleType),
StructField("principal_forgiveness_upb", StringType),
StructField("repurchase_make_whole_proceeds_flag", StringType),
StructField("foreclosure_principal_write_off_amount", StringType),
StructField("servicing_activity_indicator", StringType))
)
val udf = spark.udf.register("get_quarter", (path: String) => {
path.split("\\.").head.split("_").last
})
spark.read.format("csv")
.option("nullValue", "")
.option("header", "false")
.option("delimiter", "|")
.option("parserLib", "univocity")
.schema(performanceSchema)
.load(path)
.withColumn("quarter", udf(input_file_name()))
}
}
object ReadAcquisitionCsv {
def apply(spark: SparkSession, path: String): DataFrame = {
val acquisitionSchema = StructType(Array(
StructField("loan_id", LongType),
StructField("orig_channel", StringType),
StructField("seller_name", StringType),
StructField("orig_interest_rate", DoubleType),
StructField("orig_upb", IntegerType),
StructField("orig_loan_term", IntegerType),
StructField("orig_date", StringType),
StructField("first_pay_date", StringType),
StructField("orig_ltv", DoubleType),
StructField("orig_cltv", DoubleType),
StructField("num_borrowers", DoubleType),
StructField("dti", DoubleType),
StructField("borrower_credit_score", DoubleType),
StructField("first_home_buyer", StringType),
StructField("loan_purpose", StringType),
StructField("property_type", StringType),
StructField("num_units", IntegerType),
StructField("occupancy_status", StringType),
StructField("property_state", StringType),
StructField("zip", IntegerType),
StructField("mortgage_insurance_percent", DoubleType),
StructField("product_type", StringType),
StructField("coborrow_credit_score", DoubleType),
StructField("mortgage_insurance_type", DoubleType),
StructField("relocation_mortgage_indicator", StringType))
)
val udf = spark.udf.register("get_quarter", (path: String) => {
path.split("\\.").head.split("_").last
})
spark.read.format("csv")
.option("header", "false")
.option("delimiter", "|")
.schema(acquisitionSchema)
.load(path)
.withColumn("quarter", udf(input_file_name()))
}
}
val suffix = "_13k"
val inputDir = "/data/dataset/mortgage/repro_minimal/org"
val outputDir = "/data/dataset/mortgage/repro_minimal/parquet"
val perfPath = inputDir + "/Performance_2000Q1" + suffix + ".txt"
println("perfPath: " + perfPath)
val perfCsv = ReadPerformanceCsv(spark, perfPath)
println("#records of perfCsv: " + perfCsv.count())
// Note: 16gb memory is required for driver (and executor?) memory.
val outPerfPath = outputDir + "/Performance_2000Q1" + suffix + ".parquet"
perfCsv.write.format("parquet").save(outPerfPath)
// This code is based on the example below.
// https://github.com/rapidsai/spark-examples/blob/master/datasets/ETL/MortgageETL.ipynb
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.storage.StorageLevel
val storageLevel = StorageLevel.MEMORY_ONLY
val suffix = "_13k"
val inputDir = "/data/dataset/mortgage/repro_minimal/parquet"
val perfPath = inputDir + "/Performance_2000Q1" + suffix + ".parquet"
println("perfPath: " + perfPath)
val perfCsv = spark.read.format("parquet").load(perfPath).persist(storageLevel)
perfCsv.show(10)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment