Last active
December 9, 2021 08:42
-
-
Save rodesai/0dca465bd6b2aa45e3b8d9d21eb1aa76 to your computer and use it in GitHub Desktop.
KSQL INSERT INTO Example
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
CREATE STREAM SALES_ONLINE (ITEMID BIGINT, CUSTOMERID BIGINT, PRICE INTEGER, PAYMENT_METHOD INTEGER, SHIPMENTID BIGINT) WITH (KAFKA_TOPIC='sales-online', VALUE_FORMAT='json'); | |
CREATE STREAM SALES_INSTORE (ITEMID BIGINT, CUSTOMERID BIGINT, PRICE INTEGER, PAYMENT_METHOD INTEGER, STOREID BIGINT) WITH (KAFKA_TOPIC='sales-instore', VALUE_FORMAT='json'); | |
CREATE STREAM ALL_SALES (ITEMID BIGINT, CUSTOMERID BIGINT, PRICE INTEGER, PAYMENT_METHOD INTEGER) WITH (KAFKA_TOPIC='sales', VALUE_FORMAT='json'); | |
INSERT INTO ALL_SALES SELECT ITEMID, CUSTOMERID, PRICE, PAYMENT_METHOD FROM SALES_ONLINE; | |
INSERT INTO ALL_SALES SELECT ITEMID, CUSTOMERID, PRICE, PAYMENT_METHOD FROM SALES_INSTORE; | |
CREATE TABLE DAILY_SALES_PER_ITEM AS SELECT ITEMID, SUM(PRICE) FROM ALL_SALES WINDOW TUMBLING (SIZE 1 DAY) GROUP BY ITEMID; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Is it required that KAFKA_TOPIC='sales' should be manually created prior creating ALL_SALES stream? Can we provide Key attribute for ALL SALES stream at this stage ?