Skip to content

Instantly share code, notes, and snippets.

@rodesai
Last active December 9, 2021 08:42
Show Gist options
  • Save rodesai/0dca465bd6b2aa45e3b8d9d21eb1aa76 to your computer and use it in GitHub Desktop.
Save rodesai/0dca465bd6b2aa45e3b8d9d21eb1aa76 to your computer and use it in GitHub Desktop.
KSQL INSERT INTO Example
CREATE STREAM SALES_ONLINE (ITEMID BIGINT, CUSTOMERID BIGINT, PRICE INTEGER, PAYMENT_METHOD INTEGER, SHIPMENTID BIGINT) WITH (KAFKA_TOPIC='sales-online', VALUE_FORMAT='json');
CREATE STREAM SALES_INSTORE (ITEMID BIGINT, CUSTOMERID BIGINT, PRICE INTEGER, PAYMENT_METHOD INTEGER, STOREID BIGINT) WITH (KAFKA_TOPIC='sales-instore', VALUE_FORMAT='json');
CREATE STREAM ALL_SALES (ITEMID BIGINT, CUSTOMERID BIGINT, PRICE INTEGER, PAYMENT_METHOD INTEGER) WITH (KAFKA_TOPIC='sales', VALUE_FORMAT='json');
INSERT INTO ALL_SALES SELECT ITEMID, CUSTOMERID, PRICE, PAYMENT_METHOD FROM SALES_ONLINE;
INSERT INTO ALL_SALES SELECT ITEMID, CUSTOMERID, PRICE, PAYMENT_METHOD FROM SALES_INSTORE;
CREATE TABLE DAILY_SALES_PER_ITEM AS SELECT ITEMID, SUM(PRICE) FROM ALL_SALES WINDOW TUMBLING (SIZE 1 DAY) GROUP BY ITEMID;
@asifakb
Copy link

asifakb commented Sep 6, 2018

Is it required that KAFKA_TOPIC='sales' should be manually created prior creating ALL_SALES stream? Can we provide Key attribute for ALL SALES stream at this stage ?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment