Skip to content

Instantly share code, notes, and snippets.

Last active February 4, 2021 05:31
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
Star You must be signed in to star a gist
What would you like to do?
val zoneDF ="header","true").schema(taxiZoneSchema).csv(taxiZoneSourcePath)
val inputDF =
// Take your pick on how to transform, withColumn or SQL Expressions. Only one of these is needed.
// Option A
// val transformedDF = {
// inputDF
// .withColumn("yearMonth", regexp_replace(substring("tpepPickupDatetime",1,7), '-', '_'))
// .withColumn("pickupDt", to_date("tpepPickupDatetime", dateFormat))
// .withColumn("dropoffDt", to_date("tpepDropoffDatetime", dateFormat))
// .withColumn("tipPct", col("tipAmount") / col("totalAmount"))
// }
// Option B
val transformedDF = inputDF.selectExpr(
"replace(left(tpepPickupDatetime, 7),'-','_') as yearMonth",
s"to_date(tpepPickupDatetime, '$dateFormat') as pickupDt",
s"to_date(tpepDropoffDatetime, '$dateFormat') as dropoffDt",
"tipAmount/totalAmount as tipPct")
val zoneDF ="delta").load(taxiZonePath)
// Join to bring in Taxi Zone data
val tripDF = {"t")
.join("z"), expr("t.PULocationID == z.LocationID"), joinType="left").drop("LocationID")
.withColumnRenamed("Burough", "PickupBurrough")
.withColumnRenamed("Zone", "PickupZone")
.withColumnRenamed("ServiceZone", "PickupServiceZone")
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType}
import spark.implicits._
val yellowSourcePath = "wasbs://*/*.parquet"
val taxiZoneSourcePath = "abfss://"
val taxiZonePath = "abfss://"
val taxiRatePath = "abfss://"
val yellowDeltaPath = "abfss://"
val dateFormat = "yyyy-MM-dd HH:mm:ss"
// Define a schema that Spark understands. This is one of several ways to do it.
val taxiZoneSchema = StructType(Seq(
StructField("LocationID", IntegerType),
StructField("Borough", StringType),
StructField("Zone", StringType),
StructField("ServiceZone", StringType)
val testDF ="delta").load(yellowDeltaPath).limit(20)"VendorID", "tpepPickupDatetime", "tpepDropoffDatetime", "passengerCount").show()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment