Skip to content

Instantly share code, notes, and snippets.

@Ugbot
Last active August 3, 2023 20:33
Show Gist options
  • Save Ugbot/1503a67a921cf5d1eb12aabb73efb8a1 to your computer and use it in GitHub Desktop.
Save Ugbot/1503a67a921cf5d1eb12aabb73efb8a1 to your computer and use it in GitHub Desktop.
Flink SQL for sorting events in time windows and dead lettering the late stuff
CREATE TABLE kafka_input (
id INT,
event_timestamp STRING,
message STRING,
proctime AS PROCTIME(),
eventtime AS TO_TIMESTAMP(event_timestamp),
WATERMARK FOR eventtime AS eventtime - INTERVAL '1' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'input_topic',
'properties.bootstrap.servers' = 'localhost:9092',
'format' = 'json',
'json.fail-on-missing-field' = 'false',
'json.ignore-parse-errors' = 'true'
);
CREATE TABLE kafka_output_late (
id INT,
event_timestamp STRING,
message STRING
) WITH (
'connector' = 'kafka',
'topic' = 'output_topic_late',
'properties.bootstrap.servers' = '',
'format' = 'json'
);
CREATE TABLE kafka_output_sorted (
id INT,
event_timestamp STRING,
message STRING
) WITH (
'connector' = 'kafka',
'topic' = 'output_topic_sorted',
'properties.bootstrap.servers' = '',
'format' = 'json'
);
INSERT INTO kafka_output_late
SELECT id, event_timestamp, message
FROM TABLE(
TUMBLE(TABLE kafka_input, DESCRIPTOR(proctime), INTERVAL '1' SECOND)
)
WHERE eventtime > window_end;
INSERT INTO kafka_output_sorted
SELECT id, event_timestamp, message
FROM (
SELECT *,
ROW_NUMBER() OVER(PARTITION BY window_start ORDER BY eventtime DESC) as row_num
FROM TABLE(
TUMBLE(TABLE kafka_input, DESCRIPTOR(proctime), INTERVAL '1' SECOND)
)
WHERE eventtime <= window_end
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment