Last active
August 3, 2023 20:33
-
-
Save Ugbot/1503a67a921cf5d1eb12aabb73efb8a1 to your computer and use it in GitHub Desktop.
Flink SQL for sorting events in time windows and dead lettering the late stuff
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
CREATE TABLE kafka_input ( | |
id INT, | |
event_timestamp STRING, | |
message STRING, | |
proctime AS PROCTIME(), | |
eventtime AS TO_TIMESTAMP(event_timestamp), | |
WATERMARK FOR eventtime AS eventtime - INTERVAL '1' SECOND | |
) WITH ( | |
'connector' = 'kafka', | |
'topic' = 'input_topic', | |
'properties.bootstrap.servers' = 'localhost:9092', | |
'format' = 'json', | |
'json.fail-on-missing-field' = 'false', | |
'json.ignore-parse-errors' = 'true' | |
); | |
CREATE TABLE kafka_output_late ( | |
id INT, | |
event_timestamp STRING, | |
message STRING | |
) WITH ( | |
'connector' = 'kafka', | |
'topic' = 'output_topic_late', | |
'properties.bootstrap.servers' = '', | |
'format' = 'json' | |
); | |
CREATE TABLE kafka_output_sorted ( | |
id INT, | |
event_timestamp STRING, | |
message STRING | |
) WITH ( | |
'connector' = 'kafka', | |
'topic' = 'output_topic_sorted', | |
'properties.bootstrap.servers' = '', | |
'format' = 'json' | |
); | |
INSERT INTO kafka_output_late | |
SELECT id, event_timestamp, message | |
FROM TABLE( | |
TUMBLE(TABLE kafka_input, DESCRIPTOR(proctime), INTERVAL '1' SECOND) | |
) | |
WHERE eventtime > window_end; | |
INSERT INTO kafka_output_sorted | |
SELECT id, event_timestamp, message | |
FROM ( | |
SELECT *, | |
ROW_NUMBER() OVER(PARTITION BY window_start ORDER BY eventtime DESC) as row_num | |
FROM TABLE( | |
TUMBLE(TABLE kafka_input, DESCRIPTOR(proctime), INTERVAL '1' SECOND) | |
) | |
WHERE eventtime <= window_end | |
) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment