CREATE SOURCE IF NOT EXISTS f1_stream (
race_id int,
driver_id int,
lap int,
pos int,
ts string
)
WITH (
connector = 'kafka',
topic = 'F1Topic',
properties.bootstrap.server='127.0.0.1:9092',
scan.startup.mode = 'earliest'
)
ROW FORMAT JSON;