Skip to content

Instantly share code, notes, and snippets.

@thanoojgithub
Last active March 27, 2022 16:44
Show Gist options
  • Save thanoojgithub/48e2add331c91a8c6cfc2a5fe16314db to your computer and use it in GitHub Desktop.
Save thanoojgithub/48e2add331c91a8c6cfc2a5fe16314db to your computer and use it in GitHub Desktop.
Sample code 2 - Implementing SCD Type 2 Data model using PySpark
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
spark = SparkSession \
.builder \
.master('local') \
.appName('pyspark-test-run') \
.getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
temporaryGcsBucket = "temporarygcsbucket1"
spark.conf.set('temporaryGcsBucket', temporaryGcsBucket)
df1 = spark.read.format('bigquery').option('table', 'cancellationData1.cancellationData').load()
df2 = spark.read.format("csv").options(header='True', inferSchema='True', delimiter=',') \
.load("gs://bucket_27122021/data/cancellationData/Cancellation_Details_Jan012022.csv")
# For now, converting csv to parquet format, writing to GCP bucket, to make expected input to start transformation.
df2.write.mode("overwrite").parquet("gs://bucket_27122021/data/cancellationData/Jan2022/parquet")
# actual transformation will start from here.
# reading airLine data as parquet format from GCP bucket and dropping unwanted columns
df2_1 = spark.read.parquet("gs://bucket_27122021/data/cancellationData/Jan2022/parquet") \
.drop('CreatedTimestamp', 'UpdatedTimestamp')
df2_2 = df2_1.filter(col("CancellationCode").isNotNull() & col("CancellationDesc").isNotNull())
ccDF2_3 = df2_2.withColumn("StartDate", to_timestamp(lit("2022-01-31 23:59:59"), 'yyyy-MM-dd HH:mm:ss')) \
.withColumn("EndDate", to_timestamp(lit("9999-12-31 23:59:59"), 'yyyy-MM-dd HH:mm:ss'))
df1.show(truncate=False)
ccDF2_3.show(truncate=False)
ccDF1_1 = df1.filter(to_timestamp(col('EndDate')) < to_timestamp(lit("9999-12-31 23:59:59"), 'yyyy-MM-dd HH:mm:ss'))
ccDF1_1.show(truncate=False)
ccDF1_2 = df1.filter(to_timestamp(col('EndDate')) == to_timestamp(lit("9999-12-31 23:59:59"), 'yyyy-MM-dd HH:mm:ss')) \
.withColumn("EndDate",
when(col('CancellationCode').isin(
ccDF2_3.select(col('CancellationCode')).rdd.flatMap(lambda x: x).collect()),
to_timestamp(current_timestamp(), "yyyy-MM-dd HH:mm:ss")).otherwise(col('EndDate')))
ccDF1_2.printSchema()
ccDF1_2.show(truncate=False)
ccDF1_3 = ccDF1_1.unionByName(ccDF1_2).unionByName(ccDF2_3).orderBy(col('CancellationCode'), col('EndDate'))
ccDF1_3.printSchema()
ccDF1_3.show(truncate=False)
ccDF1_3.write.format('bigquery') \
.option('table', 'cancellationData1.cancellationData') \
.partitionBy('CancellationCode') \
.mode("overwrite") \
.save()
+----------------+------------------------------+-------------------+-----------------------+
|CancellationCode|CancellationDesc |StartDate |EndDate |
+----------------+------------------------------+-------------------+-----------------------+
|A |Due to heavy rainfall |2021-12-31 23:59:59|2022-01-31 08:21:12.648|
|B |Due to heavy snowfall |2021-12-31 23:59:59|2022-01-31 08:21:12.648|
|C |Due to some technical issue |2021-12-31 23:59:59|2022-01-31 08:21:12.648|
|A |Due to heavy rainfall |2022-01-31 23:59:59|9999-12-31 23:59:59 |
|B |Due to heavy near by tornadoes|2022-01-31 23:59:59|9999-12-31 23:59:59 |
|C |Due to some technical issue |2022-01-31 23:59:59|9999-12-31 23:59:59 |
|D |Due to bad radio signals |2022-01-31 23:59:59|9999-12-31 23:59:59 |
|E |Due to some technical glitches|2022-01-31 23:59:59|9999-12-31 23:59:59 |
+----------------+------------------------------+-------------------+-----------------------+
+----------------+------------------------------+-------------------+-------------------+
|CancellationCode|CancellationDesc |StartDate |EndDate |
+----------------+------------------------------+-------------------+-------------------+
|A |Due to heavy rainfall |2022-01-31 23:59:59|9999-12-31 23:59:59|
|C |Due to some technical issue |2022-01-31 23:59:59|9999-12-31 23:59:59|
|B |Due to heavy near by tornadoes|2022-01-31 23:59:59|9999-12-31 23:59:59|
|E |Due to some technical glitches|2022-01-31 23:59:59|9999-12-31 23:59:59|
|D |Due to bad radio signals |2022-01-31 23:59:59|9999-12-31 23:59:59|
+----------------+------------------------------+-------------------+-------------------+
+----------------+---------------------------+-------------------+-----------------------+
|CancellationCode|CancellationDesc |StartDate |EndDate |
+----------------+---------------------------+-------------------+-----------------------+
|A |Due to heavy rainfall |2021-12-31 23:59:59|2022-01-31 08:21:12.648|
|B |Due to heavy snowfall |2021-12-31 23:59:59|2022-01-31 08:21:12.648|
|C |Due to some technical issue|2021-12-31 23:59:59|2022-01-31 08:21:12.648|
+----------------+---------------------------+-------------------+-----------------------+
root
|-- CancellationCode: string (nullable = true)
|-- CancellationDesc: string (nullable = true)
|-- StartDate: timestamp (nullable = false)
|-- EndDate: timestamp (nullable = true)
+----------------+------------------------------+-------------------+-----------------------+
|CancellationCode|CancellationDesc |StartDate |EndDate |
+----------------+------------------------------+-------------------+-----------------------+
|A |Due to heavy rainfall |2022-01-31 23:59:59|2022-01-31 08:56:50.429|
|B |Due to heavy near by tornadoes|2022-01-31 23:59:59|2022-01-31 08:56:50.429|
|C |Due to some technical issue |2022-01-31 23:59:59|2022-01-31 08:56:50.429|
|D |Due to bad radio signals |2022-01-31 23:59:59|2022-01-31 08:56:50.429|
|E |Due to some technical glitches|2022-01-31 23:59:59|2022-01-31 08:56:50.429|
+----------------+------------------------------+-------------------+-----------------------+
root
|-- CancellationCode: string (nullable = true)
|-- CancellationDesc: string (nullable = true)
|-- StartDate: timestamp (nullable = true)
|-- EndDate: timestamp (nullable = true)
+----------------+------------------------------+-------------------+-----------------------+
|CancellationCode|CancellationDesc |StartDate |EndDate |
+----------------+------------------------------+-------------------+-----------------------+
|A |Due to heavy rainfall |2021-12-31 23:59:59|2022-01-31 08:21:12.648|
|A |Due to heavy rainfall |2022-01-31 23:59:59|2022-01-31 08:56:51.105|
|A |Due to heavy rainfall |2022-01-31 23:59:59|9999-12-31 23:59:59 |
|B |Due to heavy snowfall |2021-12-31 23:59:59|2022-01-31 08:21:12.648|
|B |Due to heavy near by tornadoes|2022-01-31 23:59:59|2022-01-31 08:56:51.105|
|B |Due to heavy near by tornadoes|2022-01-31 23:59:59|9999-12-31 23:59:59 |
|C |Due to some technical issue |2021-12-31 23:59:59|2022-01-31 08:21:12.648|
|C |Due to some technical issue |2022-01-31 23:59:59|2022-01-31 08:56:51.105|
|C |Due to some technical issue |2022-01-31 23:59:59|9999-12-31 23:59:59 |
|D |Due to bad radio signals |2022-01-31 23:59:59|2022-01-31 08:56:51.105|
|D |Due to bad radio signals |2022-01-31 23:59:59|9999-12-31 23:59:59 |
|E |Due to some technical glitches|2022-01-31 23:59:59|2022-01-31 08:56:51.105|
|E |Due to some technical glitches|2022-01-31 23:59:59|9999-12-31 23:59:59 |
+----------------+------------------------------+-------------------+-----------------------+
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment