Created
July 11, 2024 10:00
-
-
Save Ugbot/2d2659512c62265dcaaf5d84d2e60b7a to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
-- Define the payments source table | |
CREATE TABLE payments ( | |
payment_id STRING, | |
user_id STRING, | |
amount DECIMAL(10, 2), | |
timestamp TIMESTAMP(3), | |
WATERMARK FOR timestamp AS timestamp - INTERVAL '5' SECOND | |
) WITH ( | |
'connector' = 'kafka', | |
'topic' = 'payments', | |
'properties.bootstrap.servers' = 'localhost:9092', | |
'format' = 'json' | |
); | |
-- Define the baskets source table | |
CREATE TABLE baskets ( | |
basket_id STRING, | |
user_id STRING, | |
item STRING, | |
timestamp TIMESTAMP(3), | |
WATERMARK FOR timestamp AS timestamp - INTERVAL '5' SECOND | |
) WITH ( | |
'connector' = 'kafka', | |
'topic' = 'baskets', | |
'properties.bootstrap.servers' = 'localhost:9092', | |
'format' = 'json' | |
); | |
-- Define the sink table to store the join results | |
CREATE TABLE joined_results ( | |
payment_id STRING, | |
basket_id STRING, | |
user_id STRING, | |
payment_amount DECIMAL(10, 2), | |
item STRING, | |
payment_timestamp TIMESTAMP(3), | |
basket_timestamp TIMESTAMP(3) | |
) WITH ( | |
'connector' = 'kafka', | |
'topic' = 'joined_results', | |
'properties.bootstrap.servers' = 'localhost:9092', | |
'format' = 'json' | |
); | |
-- Perform the join within a 5-minute window | |
INSERT INTO joined_results | |
SELECT | |
p.payment_id, | |
b.basket_id, | |
p.user_id, | |
p.amount AS payment_amount, | |
b.item, | |
p.timestamp AS payment_timestamp, | |
b.timestamp AS basket_timestamp | |
FROM payments p | |
JOIN baskets b | |
ON p.user_id = b.user_id | |
AND p.timestamp BETWEEN b.timestamp - INTERVAL '5' MINUTE AND b.timestamp + INTERVAL '5' MINUTE; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment