Last active
January 27, 2021 19:05
-
-
Save dvannoy/c43208fd928ec11bb6c558d311570dcb to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
DataFrame zoneDF = spark.Read() | |
.Option("header","true") | |
.Schema(taxiZoneSchema) | |
.Csv(taxiZoneSourcePath); | |
zoneDF.Write().Format("delta").Mode("overwrite").Save(taxiZonePath); | |
zoneDF.Show(); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using Microsoft.Spark.Sql; | |
using Microsoft.Spark.Sql.Types; | |
using static Microsoft.Spark.Sql.Functions; | |
var yellowSourcePath = "wasbs://nyctlc@azureopendatastorage.blob.core.windows.net/yellow/puYear=2018/puMonth=*/*.parquet"; | |
var taxiZoneSourcePath = "abfss://demo@datakickstartadls.dfs.core.windows.net/nyctaxi/lookups/taxi_zone_lookup.csv"; | |
var taxiZonePath = "abfss://demo@datakickstartadls.dfs.core.windows.net/nyctaxi/lookups/taxi_zone"; | |
var taxiRatePath = "abfss://demo@datakickstartadls.dfs.core.windows.net/nyctaxi/lookups/taxi_rate_code"; | |
var yellowDeltaPath = "abfss://demo@datakickstartadls.dfs.core.windows.net/nyctaxi/tripdata/yellow_delta"; | |
var dateFormat = "yyyy-MM-dd HH:mm:ss"; | |
// Define a schema that Spark understands. This is one of several ways to do it. | |
var taxiZoneSchema = new StructType(new[] | |
{ | |
new StructField("LocationID", new IntegerType()), | |
new StructField("Borough", new StringType()), | |
new StructField("Zone", new StringType()), | |
new StructField("ServiceZone", new StringType()), | |
}); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
DataFrame inputDF = spark.Read() | |
.Option("inferSchema", "true") | |
.Parquet(yellowSourcePath); | |
// Take your pick on how to transform, withColumn or SQL Expressions. Only one of these is needed. | |
// Option A | |
// var transformedDF = inputDF | |
// .WithColumn("yearMonth", RegexpReplace(Substring(Col("tpepPickupDatetime"),1,7), "-", "_")) | |
// .WithColumn("pickupDt", ToDate(Col("tpepPickupDatetime"), dateFormat)) | |
// .WithColumn("dropoffDt", ToDate(Col("tpepDropoffDatetime"), dateFormat)) | |
// .WithColumn("tipPct", Col("tipAmount") / Col("totalAmount")); | |
// Option B | |
var transformedDF = inputDF.SelectExpr( | |
"*", | |
"replace(left(tpepPickupDatetime, 7),\"-\",\"_\") as yearMonth", | |
$"to_date(tpepPickupDatetime, \"{dateFormat}\") as pickupDt", | |
$"to_date(tpepDropoffDatetime, \"{dateFormat}\") as dropoffDt", | |
$"tipAmount/totalAmount as tipPct"); | |
DataFrame zoneDF = spark.Read().Format("delta").Load(taxiZonePath); | |
// Join to bring in Taxi Zone data | |
var tripDF = transformedDF | |
.Join(zoneDF, transformedDF["PULocationID"] == zoneDF["LocationID"], "left").Drop("LocationID") | |
.WithColumnRenamed("Burough", "PickupBurrough") | |
.WithColumnRenamed("Zone", "PickupZone") | |
.WithColumnRenamed("ServiceZone", "PickupServiceZone"); | |
tripDF.Write().Mode("overwrite").PartitionBy("yearMonth").Format("delta").Save(yellowDeltaPath); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
var testDF = spark.Read().Format("delta").Load(yellowDeltaPath).Limit(20); | |
testDF.Select("VendorID", "tpepPickupDatetime", "tpepDropoffDatetime", "passengerCount").Show(); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment