Suppose you want to insert values from one ksqlDB stream into another while auto-incrementing some integer value in the destination stream.
First, create the two streams:
CREATE STREAM dest (ROWKEY INT KEY, i INT, x INT) WITH (kafka_topic='test_dest', value_format='json', partitions=1);
CREATE STREAM src (x INT) WITH (kafka_topic='test_src', value_format='json', partitions=1);
Next, create a materialized view that will contain the maximum value of the destination stream.
CREATE TABLE dest_maxi AS SELECT MAX(i) AS i FROM dest GROUP BY 1;
We need to be able to join the source stream to the materialized view. To do so, we'll create another intermediate stream
with a dummy one
column that's always set to 1, which is what we grouped the materialized view on:
CREATE STREAM src_one AS SELECT x, 1 AS one FROM src;
INSERT INTO dest SELECT COALESCE(dest_maxi.i,0)+1 AS i, src_one.x AS x FROM src_one LEFT JOIN dest_maxi ON src_one.one = dest_maxi.ROWKEY PARTITION BY COALESCE(dest_maxi.i,0)+1 EMIT CHANGES;
Now you can insert values into stream src
and watch them come up in stream dest
with auto-incrementing IDs.