Skip to content

Instantly share code, notes, and snippets.

@1ambda
Created December 21, 2021 23:27
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save 1ambda/d73bf98f76d7d9a1fa846c1ba7fd9aed to your computer and use it in GitHub Desktop.
Save 1ambda/d73bf98f76d7d9a1fa846c1ba7fd9aed to your computer and use it in GitHub Desktop.
spark.sql("""
WITH GROUPED as (
    SELECT CAST(event_time AS DATE) as event_date, collect_set(product_id) as product_id_list
    FROM PURCHASE
    WHERE brand = "beautix"
    GROUP BY event_date
),
WINDOWED as (
    SELECT
        event_date,
        product_id_list as products_current,
        lag(product_id_list, 1) OVER (PARTITION BY 1 ORDER BY event_date ASC) as products_prev
    FROM GROUPED
),
CALCULATED as (
    SELECT
        event_date,
        products_current,
        products_prev,
        array_intersect(products_current, products_prev) as products_common,
        size(array_intersect(products_current, products_prev)) as products_common_size
       
    FROM WINDOWED
)
SELECT *
FROM CALCULATED
ORDER BY event_date ASC
""").show()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment