Skip to content

Instantly share code, notes, and snippets.

@panasenco
Last active June 18, 2020 20:55
Show Gist options
  • Save panasenco/4956d7f92e5de9a2e8d02558221892af to your computer and use it in GitHub Desktop.
Save panasenco/4956d7f92e5de9a2e8d02558221892af to your computer and use it in GitHub Desktop.
Processing a multi-stage request in ksqlDB

Suppose there's a stream of multi-stage requests for documents with given IDs to be retrieved from source, contrast-adjusted, OCRed, word-counted, placed in a destination directory, etc. (the exact stages can vary from request to request). The request stages have to be done in order and intermediate results in the cache should be reused (don't want to keep OCRing the same document over and over). I'd like to create a stream of requests for the individual applications (document retrieval app, OCR app, etc.). How can I do that with ksqlDB?


Let's represent both requests and products as arrays of steps it takes to create them. In the worst case, assume each step can contain custom information, so then the representation should be of the type ARRAY<MAP<STRING,STRING>>. Using an array as a key is not currently supported in ksqlDB, but we can 'cheat' by using the array cast as string as the key.

Let's break down the logic first:

  • Any new request should trigger the requests of all its prerequisites.
  • Any new request that matches an existing product should be silently ignored.
  • Any new request without a matching product but with a matching prerequisite product should be forwarded to the application tasks stream.
  • Any new product that is the prerequisite of a request should trigger the request to be forwarded to the application tasks stream.

First, let's put together our request streams and tables.

CREATE STREAM requests (stages ARRAY<MAP<STRING,STRING>>) WITH (kafka_topic='requests', value_format='json', partitions=1);

CREATE STREAM keyed_requests WITH (kafka_topic='keyed_requests', value_format='json', partitions=1) AS
SELECT
  CAST(stages AS STRING) AS request_id,
  CAST(SLICE(stages,1,ARRAY_LENGTH(stages)-1) AS STRING) AS prereq_id,
  stages,
  SLICE(stages,1,ARRAY_LENGTH(stages)-1) AS prereq
FROM requests
PARTITION BY CAST(stages AS STRING);

INSERT INTO requests
SELECT
  prereq AS stages
FROM keyed_requests
WHERE ARRAY_LENGTH(prereq) > 0;

CREATE TABLE tbl_requests (ROWKEY STRING PRIMARY KEY, request_id STRING, prereq_id STRING, stages ARRAY<MAP<STRING,STRING>>, prereq ARRAY<MAP<STRING,STRING>>) WITH (kafka_topic='keyed_requests', value_format='json');

CREATE TABLE prereq_requests AS SELECT prereq_id, COLLECT_SET(request_id) AS requests FROM keyed_requests GROUP BY prereq_id;

Next, let's create the stream/table of completed products:

CREATE STREAM products (ROWKEY STRING KEY) WITH (kafka_topic='products', value_format='json', partitions=1);
CREATE TABLE tbl_products (ROWKEY STRING PRIMARY KEY) WITH (kafka_topic='products', value_format='json');

Now, let's create a new stream for requests with completed prerequisites:

CREATE STREAM completed_prereq_requests (ROWKEY STRING KEY, request_id STRING) WITH (kafka_topic='completed_prereq_requests', value_format='json', partitions=1);

Let's insert new requests into the stream:

INSERT INTO completed_prereq_requests
SELECT
  request_id
FROM keyed_requests
LEFT JOIN tbl_products ON keyed_requests.prereq_id = tbl_products.ROWKEY
WHERE keyed_requests.prereq_id = '[]' OR tbl_products.ROWKEY IS NOT NULL
EMIT CHANGES;

Let's also insert completed products into the stream:

INSERT INTO completed_prereq_requests
SELECT
  EXPLODE(prereq_requests.requests) AS request_id
FROM products
INNER JOIN prereq_requests ON products.ROWKEY = prereq_requests.ROWKEY
EMIT CHANGES;

Now we have a stream of IDs of requests with completed prerequisites, but it still includes requests that were already completed themselves. In addition, the request arrays themselves are not in this stream. Let's filter out completed requests and enrich with array objects in a new stream:

CREATE STREAM tasks AS
SELECT
  tbl_requests.stages[ARRAY_LENGTH(tbl_requests.stages)] AS task
FROM completed_prereq_requests
LEFT JOIN tbl_requests ON completed_prereq_requests.request_id = tbl_requests.ROWKEY
LEFT JOIN tbl_products ON completed_prereq_requests.request_id = tbl_products.ROWKEY
WHERE tbl_products.ROWKEY IS NULL
PARTITION BY completed_prereq_requests.request_id;

To test this out, select the tasks stream in one ksqlDB window with SELECT * FROM tasks EMIT CHANGES;. Then open another ksqlDB window and create a request and start producing products. Watch what happens on the other screen.

INSERT INTO requests (stages) VALUES (ARRAY[MAP('p':='x'),MAP('r':='y')]);
INSERT INTO products (ROWKEY) VALUES (CAST(ARRAY[MAP('p':='x')] AS STRING));
INSERT INTO requests (stages) VALUES (ARRAY[MAP('p':='x'),MAP('r':='y2')]);
INSERT INTO requests (stages) VALUES (ARRAY[MAP('p1':='x1','p2':='x2'),MAP('r':='y'),MAP('n':='z')]);
INSERT INTO products (ROWKEY) VALUES (CAST(ARRAY[MAP('p1':='x1','p2':='x2')] AS STRING));
INSERT INTO requests (stages) VALUES (ARRAY[MAP('p1':='x1','p2':='x2'),MAP('r':='y'),MAP('n':='z2')]);
INSERT INTO products (ROWKEY) VALUES (CAST(ARRAY[MAP('p1':='x1','p2':='x2'),MAP('r':='y')] AS STRING));
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment