Skip to content

Instantly share code, notes, and snippets.

Last active February 18, 2021 05:10
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
Star You must be signed in to star a gist
What would you like to do?
from pyspark.sql.functions import col, desc, regexp_replace, substring, to_date, from_json, explode, expr
from pyspark.sql.types import StructType, StringType
yellow_source_path = "wasbs://*/*.parquet"
taxi_zone_source_path = "abfss://"
taxi_zone_path = "abfss://"
taxi_rate_path = "abfss://"
yellow_delta_path = "abfss://"
date_format = "yyyy-MM-dd HH:mm:ss"
# Define a schema that Spark understands. This is one of several ways to do it.
taxi_zone_schema = (
.add('LocationID', 'integer')
.add('Borough', 'string')
.add('Zone', 'string')
.add('ServiceZone', 'string')
zone_df = (
input_df =
# Take your pick on how to transform, withColumn or SQL Expressions. Only one of these is needed.
# Option A
# transformed_df = (
# input_df
# .withColumn("yearMonth", regexp_replace(substring("tpepPickupDatetime",1,7), '-', '_'))
# .withColumn("pickupDt", to_date("tpepPickupDatetime", date_format))
# .withColumn("dropoffDt", to_date("tpepDropoffDatetime", date_format))
# .withColumn("tipPct", col("tipAmount") / col("totalAmount"))
# )
# Option B
transformed_df = input_df.selectExpr(
"replace(left(tpepPickupDatetime, 7),'-','_') as yearMonth",
f"to_date(tpepPickupDatetime, '{date_format}') as pickupDt",
f"to_date(tpepDropoffDatetime, '{date_format}') as dropoffDt",
f"tipAmount/totalAmount as tipPct")
zone_df ="delta").load(taxi_zone_path)
# Join to bring in Taxi Zone data
trip_df = (
.join(zone_df, transformed_df["PULocationID"] == zone_df["LocationID"], how="left").drop("LocationID")
.withColumnRenamed("Burough", "PickupBurrough")
.withColumnRenamed("Zone", "PickupZone")
.withColumnRenamed("ServiceZone", "PickupServiceZone")
test_df ="delta").load(yellow_delta_path).limit(20)"VendorID", "tpepPickupDatetime", "tpepDropoffDatetime", "passengerCount").show()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment