Skip to content

Instantly share code, notes, and snippets.

@yossisp
Last active August 8, 2023 18:05
Show Gist options
  • Save yossisp/a268e60c4ed2a08a001be3fabaa6d1af to your computer and use it in GitHub Desktop.
Save yossisp/a268e60c4ed2a08a001be3fabaa6d1af to your computer and use it in GitHub Desktop.
How to convert CRUD status change log table into table with date and entity column?
# The code below refers to the following Stackoverflow answer: https://stackoverflow.com/a/76818322/5863693
!pip install -q pyspark==3.3.0 spark-nlp==5.0.0
import sparknlp
import pandas as pd
from pyspark.sql import SparkSession
from pyspark import SparkContext
spark = SparkSession.builder \
.appName("Python Spark SQL basic example") \
.getOrCreate()
json_data_deleted = '''
[
{
"timestamp": "2023-07-01T12:00:00Z",
"parent": "p1",
"child": "c1",
"status": "new"
},
{
"timestamp": "2023-07-01T13:00:00Z",
"parent": "p1",
"child": "c1",
"status": "existing"
},
{
"timestamp": "2023-07-05T13:00:00Z",
"parent": "p1",
"child": "c1",
"status": "deleted"
}
]
'''
# Create a RDD from the JSON data
sc = SparkContext.getOrCreate()
rdd = sc.parallelize([json_data_deleted])
# Read the JSON data into a DataFrame
df_deleted = spark.read.json(rdd)
# Show the DataFrame content
df_deleted.show()
df_deleted.createOrReplaceTempView("df_deleted") # correct
# sql_results = spark.sql("SELECT name FROM people")
sql_results = spark.sql("""
WITH transformed_input AS (
SELECT
CAST(timestamp AS DATE) AS dt,
parent,
child,
status,
transform(
sequence(
0,
datediff(
cast(MAX(timestamp) OVER(PARTITION BY parent, child) AS DATE) ,
cast(timestamp AS DATE)
) - 1
),
sid -> date_add(CAST(timestamp AS DATE), sid)
) as dates
FROM df_deleted
)
SELECT
explode_outer(dates) AS date,
parent,
child
FROM transformed_input
WHERE status not in ('existing', 'deleted')
""")
sql_results.show(truncate=False)
# result above correct
# ------------------------------
json_data_only_new_children = '''
[
{
"timestamp": "2023-07-01T08:00:00Z",
"parent": "p1",
"child": "c1",
"status": "new"
},
{
"timestamp": "2023-07-09T08:00:00Z",
"parent": "p2",
"child": "c1",
"status": "new"
}
]
'''
# Create a RDD from the JSON data
sc = SparkContext.getOrCreate()
rdd = sc.parallelize([json_data_only_new_children])
# Read the JSON data into a DataFrame
df_only_new_children = spark.read.json(rdd)
# Show the DataFrame content
df_only_new_children.show()
df_only_new_children.createOrReplaceTempView("df_only_new_children") # not correct
sql_results = spark.sql("""
WITH transformed_input AS (
SELECT
CAST(timestamp AS DATE) AS dt,
parent,
child,
status,
transform(
sequence(
0,
datediff(
cast(MAX(timestamp) OVER(PARTITION BY parent, child) AS DATE) ,
cast(timestamp AS DATE)
) - 1
),
sid -> date_add(CAST(timestamp AS DATE), sid)
) as dates
FROM df_only_new_children
)
SELECT
explode_outer(dates) AS date,
parent,
child
FROM transformed_input
WHERE status not in ('existing', 'deleted')
""")
sql_results.show(truncate=False)
# result above not correct
# +----------+------+-----+
# |date |parent|child|
# +----------+------+-----+
# |2023-07-01|p1 |c1 |
# |2023-07-02|p2 |c1 |
# |2023-07-03|p2 |c1 |
# |2023-07-03|p2 |c1 |
# .
# .
# .
# |2023-08-08|p2 |c1 | (today's date)
# ---------
json_data_multiple_combinations = '''
[
{
"timestamp": "2023-07-01T08:00:00Z",
"parent": "p1",
"child": "c1",
"status": "new"
},
{
"timestamp": "2023-07-02T08:00:00Z",
"parent": "p2",
"child": "c1",
"status": "deleted"
},
{
"timestamp": "2023-07-04T08:00:00Z",
"parent": "p1",
"child": "c1",
"status": "new"
},
{
"timestamp": "2023-07-05T08:00:00Z",
"parent": "p2",
"child": "c1",
"status": "deleted"
}
]
'''
# Create a RDD from the JSON data
sc = SparkContext.getOrCreate()
rdd = sc.parallelize([json_data_multiple_combinations])
# Read the JSON data into a DataFrame
df_json_data_multiple_combinations = spark.read.json(rdd)
# Show the DataFrame content
# df_only_new_children.show()
df_json_data_multiple_combinations.createOrReplaceTempView("df_json_data_multiple_combinations")
sql_results = spark.sql("""
WITH transformed_input AS (
SELECT
CAST(timestamp AS DATE) AS dt,
parent,
child,
status,
transform(
sequence(
0, -- Added below logic based git code.
CASE WHEN first(status) OVER(PARTITION BY parent, child order by timestamp desc) == 'existing'
THEN datediff(cast(MAX(timestamp) OVER(PARTITION BY parent, child) AS DATE) ,cast(timestamp AS DATE))
WHEN first(status) OVER(PARTITION BY parent, child order by timestamp desc) == 'new'
THEN 0
ELSE
datediff(
cast(MAX(timestamp) OVER(PARTITION BY parent, child) AS DATE) ,
cast(timestamp AS DATE)
) - 1
END
),
sid -> date_add(CAST(timestamp AS DATE), sid)
) as dates
FROM df_json_data_multiple_combinations
)
SELECT
distinct -- to remove duplicate entries in dataset.
explode_outer(dates) AS date,
parent,
child
FROM transformed_input
WHERE status not in ('existing', 'deleted')
""")
sql_results.show(truncate=False)
# the result above is not correct
# expected:
# | date | parent | child |
# | ---------- | ------ | ----- |
# | 2022-07-01 | p1 | c1 |
# | 2022-07-02 | p1 | c1 |
# | 2022-07-04 | p1 | c1 |
# | 2023-07-05 | p1 | c1 |
# received
# +----------+------+-----+
# |date |parent|child|
# +----------+------+-----+
# |2023-07-04|p1 |c1 |
# |2023-07-01|p1 |c1 |
# +----------+------+-----+
#------------
json_data_only_deleted = '''
[
{
"timestamp": "2023-07-11T12:00:00Z",
"parent": "p1",
"child": "c1",
"status": "deleted"
}
]
'''
# Create a RDD from the JSON data
sc = SparkContext.getOrCreate()
rdd = sc.parallelize([json_data_only_deleted])
# Read the JSON data into a DataFrame
df_only_deleted = spark.read.json(rdd)
df_only_deleted.createOrReplaceTempView("df_only_deleted")
sql_results = spark.sql("""
WITH input AS (
SELECT
CAST(timestamp AS DATE) AS dt,
parent,
child,
status,
cast(MAX(timestamp) OVER(PARTITION BY parent, child) AS DATE) as max_ts,
CASE WHEN LEAD(CAST(timestamp AS DATE), 1) OVER(PARTITION BY parent, child order by timestamp desc) IS NULL
THEN CAST(timestamp AS DATE)
ELSE LEAD(CAST(timestamp AS DATE), 1) OVER(PARTITION BY parent, child order by timestamp desc)
END as min_ts,
row_number() OVER(PARTITION BY parent, child order by timestamp desc) as row_number
FROM df_only_deleted
),
transformed_input AS (
SELECT
dt,
parent,
child,
max_ts,
min_ts,
row_number,
CASE WHEN dt == max_ts AND (status == 'existing' OR status == 'new') THEN datediff(current_date, min_ts)
WHEN dt == max_ts AND status == 'deleted' THEN datediff(date_sub(max_ts, 1), min_ts)
END AS new_date_diff ,
transform(
sequence(
0,
CASE WHEN dt == max_ts AND (status == 'existing' OR status == 'new')
THEN datediff(current_date, min_ts)
WHEN dt == max_ts AND status == 'deleted'
THEN datediff(date_sub(max_ts, 1), min_ts)
END
),
sid -> date_add(min_ts, sid)
) as dates
FROM input WHERE row_number = 1
)
SELECT
distinct
explode_outer(dates) AS date,
parent,
child
FROM
transformed_input
""")
sql_results.show(1000, truncate=False)
# the result above is not correct
# expected:
# | date | parent | child |
# | ---------- | ------ | ----- |
# | 2022-07-08 | p1 | c1 |
# | 2022-07-09 | p1 | c1 |
# | 2022-07-10 | p1 | c1 |
# | 2022-07-11 | p1 | c1 |
# received
# +----------+------+-----+
# |date |parent|child|
# +----------+------+-----+
# |2023-07-11|p1 |c1 |
# |2023-07-10|p1 |c1 |
# +----------+------+-----+
@psrinuhp
Copy link

psrinuhp commented Aug 8, 2023

scala> val mdata = """
     | [
     |     {
     |         "timestamp": "2023-08-01T12:00:00Z",
     |         "parent": "p1",
     |         "child": "c1",
     |         "status": "new"
     |     },
     |     {
     |         "timestamp": "2023-08-01T13:00:00Z",
     |         "parent": "p1",
     |         "child": "c1",
     |         "status": "existing"
     |     },
     |     {
     |         "timestamp": "2023-08-05T13:00:00Z",
     |         "parent": "p1",
     |         "child": "c1",
     |         "status": "existing"
     |     },
     |     {
     |         "timestamp": "2023-08-01T12:00:00Z",
     |         "parent": "p2",
     |         "child": "c2",
     |         "status": "new"
     |     },
     |     {
     |         "timestamp": "2023-08-01T13:00:00Z",
     |         "parent": "p2",
     |         "child": "c2",
     |         "status": "existing"
     |     },
     |     {
     |         "timestamp": "2023-08-05T13:00:00Z",
     |         "parent": "p2",
     |         "child": "c2",
     |         "status": "deleted"
     |     },
     |     {
     |         "timestamp": "2023-08-01T08:00:00Z",
     |         "parent": "p3",
     |         "child": "c3",
     |         "status": "new"
     |     },
     |     {
     |         "timestamp": "2023-08-01T13:00:00Z",
     |         "parent": "p3",
     |         "child": "c3",
     |         "status": "existing"
     |     },
     |     {
     |         "timestamp": "2023-08-01T14:00:00Z",
     |         "parent": "p3",
     |         "child": "c3",
     |         "status": "deleted"
     |     },
     |     {
     |         "timestamp": "2023-08-01T15:00:00Z",
     |         "parent": "p3",
     |         "child": "c3",
     |         "status": "new"
     |     },
     |     {
     |         "timestamp": "2023-08-01T08:00:00Z",
     |         "parent": "p4",
     |         "child": "c4",
     |         "status": "new"
     |     },
     |     {
     |         "timestamp": "2023-08-09T08:00:00Z",
     |         "parent": "p5",
     |         "child": "c1",
     |         "status": "new"
     |     },
     |     {
     |         "timestamp": "2023-08-01T12:00:00Z",
     |         "parent": "p6",
     |         "child": "c6",
     |         "status": "new"
     |     },
     |     {
     |         "timestamp": "2023-08-01T13:00:00Z",
     |         "parent": "p6",
     |         "child": "c6",
     |         "status": "existing"
     |     },
     |     {
     |         "timestamp": "2023-08-05T13:00:00Z",
     |         "parent": "p6",
     |         "child": "c6",
     |         "status": "existing"
     |     }
     | ]"""
mdata: String =
"
[
    {
        "timestamp": "2023-08-01T12:00:00Z",
        "parent": "p1",
        "child": "c1",
        "status": "new"
    },
    {
        "timestamp": "2023-08-01T13:00:00Z",
        "parent": "p1",
        "child": "c1",
        "status": "existing"
    },
    {
        "timestamp": "2023-08-05T13:00:00Z",
        "parent": "p1",
        "child": "c1",
        "status": "existing"
    },
    {
        "timestamp": "2023-08-01T12:00:00Z",
        "parent": "p2",
        "child": "c2",
        "status": "new"
    },
    {
        "timestamp": "2023-08-01T13:00:00Z",
        "parent": "p2",
        "child": "c2",
        "status": "existing"
    },
    {
        "timestamp": "2023-08-05T13:00:00Z",
        "parent": "p2",
        "child":...

scala> spark.read.json(Seq(mdata).toDS).createOrReplaceTempView("curd_data")

scala> spark.table("curd_data").show(false)
+-----+------+--------+--------------------+
|child|parent|status  |timestamp           |
+-----+------+--------+--------------------+
|c1   |p1    |new     |2023-08-01T12:00:00Z|
|c1   |p1    |existing|2023-08-01T13:00:00Z|
|c1   |p1    |existing|2023-08-05T13:00:00Z|
|c2   |p2    |new     |2023-08-01T12:00:00Z|
|c2   |p2    |existing|2023-08-01T13:00:00Z|
|c2   |p2    |deleted |2023-08-05T13:00:00Z|
|c3   |p3    |new     |2023-08-01T08:00:00Z|
|c3   |p3    |existing|2023-08-01T13:00:00Z|
|c3   |p3    |deleted |2023-08-01T14:00:00Z|
|c3   |p3    |new     |2023-08-01T15:00:00Z|
|c4   |p4    |new     |2023-08-01T08:00:00Z|
|c1   |p5    |new     |2023-08-09T08:00:00Z|
|c6   |p6    |new     |2023-08-01T12:00:00Z|
|c6   |p6    |existing|2023-08-01T13:00:00Z|
|c6   |p6    |existing|2023-08-05T13:00:00Z|
+-----+------+--------+--------------------+


scala> :paste
// Entering paste mode (ctrl-D to finish)

spark.sql("""
    WITH transformed_input AS (
        SELECT
            CAST(timestamp AS DATE) AS dt,
            parent,
            child,
            status,
            cast(MAX(timestamp) OVER(PARTITION BY parent, child) AS DATE) max_ts,
            transform(
                sequence(
                        0,
                        CASE WHEN first(status) OVER(PARTITION BY parent, child order by timestamp desc) == 'existing'
                            THEN datediff(current_date(), cast(MAX(timestamp) OVER(PARTITION BY parent, child) AS DATE))
                        WHEN first(status) OVER(PARTITION BY parent, child order by timestamp desc) == 'new'
                            THEN datediff(current_date(), cast(MAX(timestamp) OVER(PARTITION BY parent, child) AS DATE))
                        ELSE
                            datediff(
                                cast(MAX(timestamp) OVER(PARTITION BY parent, child) AS DATE) ,
                                cast(timestamp AS DATE)
                            ) - 1
                        END
                ),
                sid -> date_add(CAST(timestamp AS DATE), sid)
            ) as dates
        FROM curd_data
    )
    SELECT
        distinct
        explode_outer(dates) AS date,
        parent,
        child
    FROM transformed_input
    WHERE status not in ('existing', 'deleted')
""").show(1000, false)

// Exiting paste mode, now interpreting.

+----------+------+-----+
|date      |parent|child|
+----------+------+-----+
|2023-08-01|p1    |c1   |
|2023-08-02|p1    |c1   |
|2023-08-03|p1    |c1   |
|2023-08-04|p1    |c1   |
|2023-08-01|p2    |c2   |
|2023-08-02|p2    |c2   |
|2023-08-03|p2    |c2   |
|2023-08-04|p2    |c2   |
|2023-08-01|p3    |c3   |
|2023-08-02|p3    |c3   |
|2023-08-03|p3    |c3   |
|2023-08-04|p3    |c3   |
|2023-08-05|p3    |c3   |
|2023-08-06|p3    |c3   |
|2023-08-07|p3    |c3   |
|2023-08-08|p3    |c3   |
|2023-08-01|p4    |c4   |
|2023-08-02|p4    |c4   |
|2023-08-03|p4    |c4   |
|2023-08-04|p4    |c4   |
|2023-08-05|p4    |c4   |
|2023-08-06|p4    |c4   |
|2023-08-07|p4    |c4   |
|2023-08-08|p4    |c4   |
|2023-08-09|p5    |c1   |
|2023-08-08|p5    |c1   |
|2023-08-01|p6    |c6   |
|2023-08-02|p6    |c6   |
|2023-08-03|p6    |c6   |
|2023-08-04|p6    |c6   |
+----------+------+-----+


scala>

@yossisp
Copy link
Author

yossisp commented Aug 8, 2023

The above SQL doesn't work with the following input:

[
    {
        "timestamp": "2023-07-01T12:00:00Z",
        "parent": "p1",
        "child": "c1",
        "status": "new"
    },
    {
        "timestamp": "2023-07-01T13:00:00Z",
        "parent": "p1",
        "child": "c1",
        "status": "existing"
    },
    {
        "timestamp": "2023-07-05T13:00:00Z",
        "parent": "p1",
        "child": "c1",
        "status": "existing"
    }
]

@psrinuhp
Copy link

psrinuhp commented Aug 8, 2023

val mdata = """
[
    {
        "timestamp": "2023-08-01T12:00:00Z",
        "parent": "p1",
        "child": "c1",
        "status": "new"
    },
    {
        "timestamp": "2023-08-01T13:00:00Z",
        "parent": "p1",
        "child": "c1",
        "status": "existing"
    },
    {
        "timestamp": "2023-08-05T13:00:00Z",
        "parent": "p1",
        "child": "c1",
        "status": "existing"
    },
    {
        "timestamp": "2023-08-01T12:00:00Z",
        "parent": "p2",
        "child": "c2",
        "status": "new"
    },
    {
        "timestamp": "2023-08-01T13:00:00Z",
        "parent": "p2",
        "child": "c2",
        "status": "existing"
    },
    {
        "timestamp": "2023-08-05T13:00:00Z",
        "parent": "p2",
        "child": "c2",
        "status": "deleted"
    },
    {
        "timestamp": "2023-08-01T08:00:00Z",
        "parent": "p3",
        "child": "c3",
        "status": "new"
    },
    {
        "timestamp": "2023-08-01T13:00:00Z",
        "parent": "p3",
        "child": "c3",
        "status": "existing"
    },
    {
        "timestamp": "2023-08-01T14:00:00Z",
        "parent": "p3",
        "child": "c3",
        "status": "deleted"
    },
    {
        "timestamp": "2023-08-01T15:00:00Z",
        "parent": "p3",
        "child": "c3",
        "status": "new"
    },
    {
        "timestamp": "2023-08-01T08:00:00Z",
        "parent": "p4",
        "child": "c4",
        "status": "new"
    },
    {
        "timestamp": "2023-08-08T08:00:00Z",
        "parent": "p5",
        "child": "c1",
        "status": "new"
    },
    {
        "timestamp": "2023-08-01T12:00:00Z",
        "parent": "p6",
        "child": "c6",
        "status": "new"
    },
    {
        "timestamp": "2023-08-01T13:00:00Z",
        "parent": "p6",
        "child": "c6",
        "status": "existing"
    },
    {
        "timestamp": "2023-08-05T13:00:00Z",
        "parent": "p6",
        "child": "c6",
        "status": "existing"
    }
]"""

spark.read.json(Seq(mdata).toDS).createOrReplaceTempView("curd_data")

spark.table("curd_data").show(false)

spark.sql("""WITH input AS (
        SELECT
            CAST(timestamp AS DATE) AS dt, 
            parent,
            child,
            status,
            cast(MAX(timestamp) OVER(PARTITION BY parent, child) AS DATE) as max_ts,
            CASE WHEN LEAD(CAST(timestamp AS DATE), 1) OVER(PARTITION BY parent, child order by timestamp desc) IS NULL 
                 THEN CAST(timestamp AS DATE) 
                 ELSE LEAD(CAST(timestamp AS DATE), 1) OVER(PARTITION BY parent, child order by timestamp desc) 
            END as min_ts,
            row_number() OVER(PARTITION BY parent, child order by timestamp desc) as row_number
        FROM curd_data
    ),
    transformed_input AS (
        SELECT 
            dt,
            parent,
            child,
            max_ts,
            min_ts,
            row_number,
            CASE WHEN dt == max_ts AND (status == 'existing' OR status == 'new') THEN datediff(current_date, min_ts)
                 WHEN dt == max_ts AND status == 'deleted'  THEN datediff(date_sub(max_ts, 1), min_ts)
            END AS new_date_diff ,
            transform(
                sequence(
                        0,
                        CASE WHEN dt == max_ts AND (status == 'existing' OR status == 'new')
                            THEN datediff(current_date, min_ts) 
                        WHEN dt == max_ts AND status == 'deleted'
                            THEN datediff(date_sub(max_ts, 1), min_ts)                        
                        END
                ),
                sid -> date_add(min_ts, sid)                              
            ) as dates 
        FROM input WHERE row_number = 1
    ) 
    SELECT 
        distinct
        explode_outer(dates) AS date,
        parent,
        child
    FROM 
    transformed_input
    """).show(100, false)

+-----+------+--------+--------------------+
|child|parent|status  |timestamp           |
+-----+------+--------+--------------------+
|c1   |p1    |new     |2023-08-01T12:00:00Z|
|c1   |p1    |existing|2023-08-01T13:00:00Z|
|c1   |p1    |existing|2023-08-05T13:00:00Z|

|c2   |p2    |new     |2023-08-01T12:00:00Z|
|c2   |p2    |existing|2023-08-01T13:00:00Z|
|c2   |p2    |deleted |2023-08-05T13:00:00Z|

|c3   |p3    |new     |2023-08-01T08:00:00Z|
|c3   |p3    |existing|2023-08-01T13:00:00Z|
|c3   |p3    |deleted |2023-08-01T14:00:00Z|
|c3   |p3    |new     |2023-08-01T15:00:00Z|

|c4   |p4    |new     |2023-08-01T08:00:00Z|

|c1   |p5    |new     |2023-08-08T08:00:00Z|

|c6   |p6    |new     |2023-08-01T12:00:00Z|
|c6   |p6    |existing|2023-08-01T13:00:00Z|
|c6   |p6    |existing|2023-08-05T13:00:00Z|
+-----+------+--------+--------------------+

+----------+------+-----+
|date      |parent|child|
+----------+------+-----+
|2023-08-01|p1    |c1   |
|2023-08-02|p1    |c1   |
|2023-08-03|p1    |c1   |
|2023-08-04|p1    |c1   |
|2023-08-05|p1    |c1   |
|2023-08-06|p1    |c1   |
|2023-08-07|p1    |c1   |
|2023-08-08|p1    |c1   |

|2023-08-01|p2    |c2   |
|2023-08-02|p2    |c2   |
|2023-08-03|p2    |c2   |
|2023-08-04|p2    |c2   | 

|2023-08-01|p3    |c3   | 
|2023-08-02|p3    |c3   |
|2023-08-03|p3    |c3   |
|2023-08-04|p3    |c3   |
|2023-08-05|p3    |c3   |
|2023-08-06|p3    |c3   |
|2023-08-07|p3    |c3   |
|2023-08-08|p3    |c3   |

|2023-08-01|p4    |c4   |
|2023-08-02|p4    |c4   |
|2023-08-03|p4    |c4   |
|2023-08-04|p4    |c4   |
|2023-08-05|p4    |c4   |
|2023-08-06|p4    |c4   |
|2023-08-07|p4    |c4   |
|2023-08-08|p4    |c4   |

|2023-08-08|p5    |c1   |

|2023-08-01|p6    |c6   |
|2023-08-02|p6    |c6   |
|2023-08-03|p6    |c6   |
|2023-08-04|p6    |c6   |
|2023-08-05|p6    |c6   |
|2023-08-06|p6    |c6   |
|2023-08-07|p6    |c6   |
|2023-08-08|p6    |c6   |

+----------+------+-----+

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment