Skip to content

Instantly share code, notes, and snippets.

@panasenco
Last active June 16, 2020 20:54
Show Gist options
  • Save panasenco/2e2f0c846c9d40dd8350da57385c8d59 to your computer and use it in GitHub Desktop.
Save panasenco/2e2f0c846c9d40dd8350da57385c8d59 to your computer and use it in GitHub Desktop.
Creating an auto-incrementing column in ksqlDB

Suppose you want to insert values from one ksqlDB stream into another while auto-incrementing some integer value in the destination stream.

First, create the two streams:

CREATE STREAM dest (ROWKEY INT KEY, i INT, x INT) WITH (kafka_topic='test_dest', value_format='json', partitions=1);
CREATE STREAM src (x INT) WITH (kafka_topic='test_src', value_format='json', partitions=1);

Next, create a materialized view that will contain the maximum value of the destination stream.

CREATE TABLE dest_maxi AS SELECT MAX(i) AS i FROM dest GROUP BY 1;

We need to be able to join the source stream to the materialized view. To do so, we'll create another intermediate stream with a dummy one column that's always set to 1, which is what we grouped the materialized view on:

CREATE STREAM src_one AS SELECT x, 1 AS one FROM src;
INSERT INTO dest SELECT COALESCE(dest_maxi.i,0)+1 AS i, src_one.x AS x FROM src_one LEFT JOIN dest_maxi ON src_one.one = dest_maxi.ROWKEY PARTITION BY COALESCE(dest_maxi.i,0)+1 EMIT CHANGES;

Now you can insert values into stream src and watch them come up in stream dest with auto-incrementing IDs.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment