Skip to content

Instantly share code, notes, and snippets.

@yossisp
Created August 8, 2023 13:19
Show Gist options
  • Save yossisp/7ce90216e61a548198489450a99052fa to your computer and use it in GitHub Desktop.
Save yossisp/7ce90216e61a548198489450a99052fa to your computer and use it in GitHub Desktop.
How to convert CRUD status change log table into table with date and entity column?
!pip install -q pyspark==3.3.0 spark-nlp==5.0.0
import pandas as pd
from pyspark.sql import SparkSession
from pyspark import SparkContext
spark = SparkSession.builder \
.appName("Python Spark SQL basic example") \
.getOrCreate()
json_data_no_deleted = '''
[
{
"timestamp": "2023-07-01T12:00:00Z",
"parent": "p1",
"child": "c1",
"status": "new"
},
{
"timestamp": "2023-07-01T13:00:00Z",
"parent": "p1",
"child": "c1",
"status": "existing"
},
{
"timestamp": "2023-07-05T13:00:00Z",
"parent": "p1",
"child": "c1",
"status": "existing"
}
]
'''
# Create a RDD from the JSON data
sc = SparkContext.getOrCreate()
rdd = sc.parallelize([json_data_no_deleted])
# Read the JSON data into a DataFrame
df_no_deleted = spark.read.json(rdd)
# Show the DataFrame content
df_no_deleted.show()
df_no_deleted.createOrReplaceTempView("df_no_deleted") # not correct
sql_results = spark.sql("""
WITH
changes AS (
SELECT
parent,
child,
CASE
WHEN status = 'new' THEN timestamp
WHEN LAG(status) OVER (PARTITION BY parent, child ORDER BY timestamp) = 'new' AND status = 'deleted' THEN timestamp
END as active_period,
status
FROM
df_no_deleted
WHERE
status IN ('new', 'deleted')
),
matched_periods_floored AS (
SELECT
parent,
child,
status,
active_period,
FLOOR((ROW_NUMBER() OVER (PARTITION BY parent, child ORDER BY active_period) - 1) / 2) as floored
FROM
changes
),
matched_periods AS (
SELECT
parent,
child,
MIN(CASE WHEN status = 'new' THEN active_period END) as start_date,
MIN(CASE WHEN status = 'deleted' THEN active_period END) as end_date
FROM
matched_periods_floored
GROUP BY
parent,
child,
floored
),
final_table AS (
SELECT
EXPLODE(SEQUENCE(TO_DATE(start_date), TO_DATE(end_date), INTERVAL 1 DAY)) AS date,
parent,
child
FROM
matched_periods
)
select * from final_table
""")
sql_results.show(truncate=False)
# result above is not correct
# ------
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment