Skip to content

Instantly share code, notes, and snippets.

@DennisFederico
Created August 4, 2022 17:06
Show Gist options
  • Save DennisFederico/99a2effc32a4982d189af2ffe1a257d4 to your computer and use it in GitHub Desktop.
Save DennisFederico/99a2effc32a4982d189af2ffe1a257d4 to your computer and use it in GitHub Desktop.
KSQLDB Simple Tutorial with Confluent Cloud
tags: #ksqldb/tutorial

Provisioning ksqlDB Cluster

Cluster Data

environment=env-0x3135
kafka_cluster=lkc-nv010v

Provision Schema Registry

confluent schema-registry cluster enable --cloud gcp --geo eu --environment env-0x3135
+--------------+-----------------------------------------------------+
| Id           | lsrc-k802op                                         |
| Endpoint URL | https://psrc-kk5gg.europe-west3.gcp.confluent.cloud |
+--------------+-----------------------------------------------------+


## Service Account
```shell
confluent iam service-account create siemensSA --description "Siemens Demo Service Account"
+-------------+------------------------------+
| ID          | sa-1jn0v3                    |
| Name        | siemensSA                    |
| Description | Siemens Demo Service Account |
+-------------+------------------------------+

API Key for SA

confluent api-key create --service-account sa-1jn0v3 --environment env-0x3135 --resource lkc-nv010v --description "SiemensSA key for Kafka Cluster"
It may take a couple of minutes for the API key to be ready.
Save the API key and secret. The secret is not retrievable later.
+---------+------------------------------------------------------------------+
| API Key | CKFNMATDK2HDX43B                                                 |
| Secret  | voDoUuJ1FwElE6p3TB8qICNyFjoTROCHMWkJZQ2voCEOE10EeVTrSPyWZhafzxl5 |
+---------+------------------------------------------------------------------+

Create ksqlDB Cluster with CLI

confluent ksql cluster create siemens_ksqldb_demo --csu 2 --environment env-0x3135 --cluster lkc-nv010v --api-key CKFNMATDK2HDX43B --api-secret voDoUuJ1FwElE6p3TB8qICNyFjoTROCHMWkJZQ2voCEOE10EeVTrSPyWZhafzxl5
+--------------+-----------------------------------------------------------+
| Id           | lksqlc-120kk5                                             |
| Name         | siemens_ksqldb_demo                                       |
| Topic Prefix | pksqlc-75wq1                                              |
| Kafka        | lkc-nv010v                                                |
| Storage      |                                                       250 |
| Endpoint     | https://pksqlc-75wq1.europe-west3.gcp.confluent.cloud:443 |
| Status       | PROVISIONING                                              |
+--------------+-----------------------------------------------------------+
confluent ksql cluster configure-acls lksqlc-120kk5 --environment env-0x3135 --cluster lkc-nv010v

confluent kafka acl create --service-account sa-1jn0v3 --cluster lkc-nv010v --allow --operation read --operation write --operation create --operation delete --topic "*"

Table and Streams

  • Create a STREAM and see how it translates to a topic
CREATE STREAM users_demo (
	id INTEGER KEY, 
	gender STRING, 
	name STRING, 
	age INTEGER
) WITH (
	kafka_topic='users_demo', 
	partitions=1, 
	value_format='JSON_SR');
  • Insert some data
INSERT INTO users_demo (id, gender, name, age) VALUES (0, 'female', 'sarah', 42);
INSERT INTO users_demo (id, gender, name, age) VALUES (1, 'male', 'john', 28);
INSERT INTO users_demo (id, gender, name, age) VALUES (42, 'female', 'jessica', 70);
  • Check the content of the STREAM
## REMEMBER TO SET THE `auto.offset.reset` to `Earliest`
select * from USERS_DEMO EMIT CHANGES;
  • Check the content of the TOPIC
  • Check SCHEMA-REGISTRY

Add Data with DATAGEN

User REFERENCE Data

Connector

{
  "name": "UserDataGenerator",
  "config": {
    "connector.class": "DatagenSource",
    "name": "UserDaraGenerator",
    "kafka.auth.mode": "KAFKA_API_KEY",
    "kafka.api.key": "CKFNMATDK2HDX43B",
    "kafka.api.secret": "voDoUuJ1FwElE6p3TB8qICNyFjoTROCHMWkJZQ2voCEOE10EeVTrSPyWZhafzxl5",
    "kafka.topic": "users_data",
    "output.data.format": "JSON_SR",
    "quickstart": "USERS_ARRAY",
    "max.interval": "5000",
    "tasks.max": "1"
  }
}

Stream

CREATE STREAM users_data WITH (KAFKA_TOPIC='users_data', VALUE_FORMAT='JSON_SR');

{ "REGISTERTIME": 1508381460550, "USERID": "User_4", "REGIONID": "Region_3", "GENDER": "MALE", "INTERESTS": [ "News", "Movies" ], "CONTACTINFO": { "zipcode": "95112", "state": "CA", "phone": "4083366881", "city": "San Jose" } }

SELECT *
FROM USERS_DATA 
WHERE USERID='User_2'
EMIT CHANGES;


CREATE STREAM USERS_DATA_FLAT WITH (KAFKA_TOPIC='users_data_flat', VALUE_FORMAT='JSON_SR') AS
	SELECT USERID, 
	GENDER, 
    CONTACTINFO['phone'] as PHONE,
    CONTACTINFO['city'] as CITY,
	CASE WHEN CONTACTINFO['city']='San Carlos' THEN 'NORTH' 
    	WHEN CONTACTINFO['city']='San Mateo' THEN 'SOUTH' 
        WHEN CONTACTINFO['city']='Palo Alto' THEN 'EAST' 
        WHEN CONTACTINFO['city']='Irvine' THEN 'WEST' 
        ELSE 'CENTER' END AS REGION
FROM  USERS_DATA
EMIT CHANGES;


SELECT USERID, COUNT(*) AS COUNT
FROM  USERS_DATA_FLAT
GROUP BY USERID
EMIT CHANGES;


SELECT USERID,
	LATEST_BY_OFFSET(GENDER) AS GENDER,
    LATEST_BY_OFFSET(PHONE) AS PHONE,
    LATEST_BY_OFFSET(CITY) AS CITY,
    LATEST_BY_OFFSET(REGION) AS REGION
FROM  USERS_DATA_FLAT
GROUP BY (USERID)
EMIT CHANGES;


CREATE TABLE USERS_DATA_TBL WITH (KAFKA_TOPIC='users_data_tbl', VALUE_FORMAT='JSON_SR') AS
SELECT USERID,
	LATEST_BY_OFFSET(GENDER) AS GENDER,
    LATEST_BY_OFFSET(PHONE) AS PHONE,
    LATEST_BY_OFFSET(CITY) AS CITY,
    LATEST_BY_OFFSET(REGION) AS REGION
FROM  USERS_DATA_FLAT
GROUP BY (USERID)
EMIT CHANGES;


## THIS DOES NOT WORK
CREATE SOURCE TABLE USERS_DATA_SRC_TBL (
	USERID string PRIMARY KEY, 
	GENDER string, PHONE string, 
	CITY string, REGION string) 
WITH (kafka_topic='users_data_flat', value_format='JSON_SR', value_schema_id=100020);

Stock Trades DATA

{
  "name": "StockTradesDataGenerator",
  "config": {
    "connector.class": "DatagenSource",
    "name": "DatagenSourceConnector_1",
    "kafka.auth.mode": "KAFKA_API_KEY",
    "kafka.api.key": "CKFNMATDK2HDX43B",
    "kafka.api.secret": "voDoUuJ1FwElE6p3TB8qICNyFjoTROCHMWkJZQ2voCEOE10EeVTrSPyWZhafzxl5",
    "kafka.topic": "stock_trades",
    "output.data.format": "JSON_SR",
    "quickstart": "STOCK_TRADES",
    "max.interval": "3000",
    "tasks.max": "1"
  }
}
CREATE STREAM stock_trades WITH (KAFKA_TOPIC='stock_trades', VALUE_FORMAT='JSON_SR');


SELECT USERS.USERID AS USERID,
	TRADES.SIDE,
    TRADES.SYMBOL,
    TRADES.QUANTITY,
    TRADES.PRICE,
    (TRADES.QUANTITY * TRADES.PRICE) AS TOTAL,
    USERS.REGION,
    USERS.PHONE
    FROM STOCK_TRADES AS TRADES 
    LEFT JOIN USERS_DATA_TBL AS USERS ON TRADES.USERID = USERS.USERID EMIT CHANGES;


CREATE STREAM USER_TRADES WITH (KAFKA_TOPIC='user_trades', VALUE_FORMAT='JSON_SR', PARTITIONS=3) AS
SELECT TRADES.USERID AS USERID,
	TRADES.SIDE,
    TRADES.SYMBOL,
    TRADES.QUANTITY,
    TRADES.PRICE,
    CASE WHEN TRADES.SIDE='BUY' THEN (TRADES.QUANTITY * TRADES.PRICE * -1)
         WHEN TRADES.SIDE='SELL' THEN (TRADES.QUANTITY * TRADES.PRICE)
    END AS TOTAL,
    USERS.REGION,
    USERS.PHONE
    FROM STOCK_TRADES AS TRADES 
    LEFT JOIN USERS_DATA_TBL AS USERS ON TRADES.USERID = USERS.USERID
PARTITION BY TRADES.USERID
EMIT CHANGES;


SELECT USERID, 
	   SYMBOL,
       SUM(TOTAL),
       LATEST_BY_OFFSET(REGION) AS REGION,
       LATEST_BY_OFFSET(PHONE) AS PHONE
FROM USER_TRADES
GROUP BY USERID, SYMBOL
EMIT CHANGES;

CREATE TABLE USER_TRADES_TOTAL WITH (KAFKA_TOPIC='users_trade_total_tbl', VALUE_FORMAT='JSON_SR', KEY_FORMAT='JSON_SR') AS
SELECT USERID, 
	   SYMBOL,
       SUM(TOTAL) AS TOTAL,
       LATEST_BY_OFFSET(REGION) AS REGION,
       LATEST_BY_OFFSET(PHONE) AS PHONE
FROM USER_TRADES
GROUP BY USERID, SYMBOL
EMIT CHANGES;


SELECT USERID, SUM(TOTAL) AS TOTAL FROM USER_TRADES_TOTAL GROUP BY USERID EMIT CHANGES;

Pull Query using REST API

https://pksqlc-gw90v.europe-west3.gcp.confluent.cloud:443/ksql

#auth
Basic

#body
{
  "ksql": "show streams;",
  "streamProperties":{}
}

#body
{
  "ksql": "show tables;",
  "streamProperties":{}
}

#body - insert
{
  "ksql": "INSERT INTO MOVEMENTS VALUES('Allison', 'Loveland');",
  "streamProperties":{}
}


https://pksqlc-gw90v.europe-west3.gcp.confluent.cloud:443/query

#body - query
{
  "ksql": "SELECT * FROM PERSON_STATS;",
  "streamProperties":{}    
}

#body - query
{
  "ksql": "SELECT * FROM PERSON_STATS WHERE person ='Allison';",
  "streamProperties":{}    
}

AdHoc

Explode JSON Array of Structs

CREATE TYPE BALANCE_TYPE AS STRUCT<account_id STRING, balance DOUBLE>;

CREATE STREAM CUSTOMER_STATS (
  CUSTOMER_ID STRING KEY,
  CUSTOMER_NAME STRING,
  BALANCES ARRAY<BALANCE_TYPE>
) WITH (
  KAFKA_TOPIC='CUSTOMER_STATS',
  PARTITIONS=1,  
  VALUE_FORMAT='JSON_SR');


INSERT INTO CUSTOMER_STATS (CUSTOMER_ID, CUSTOMER_NAME, BALANCES) VALUES (
  'DF1234', 'DENNIS', ARRAY[
    STRUCT(ACCOUNT_ID:='ACC123', BALANCE:=123.45), 
    STRUCT(ACCOUNT_ID:='ACC456', BALANCE:=567.86), 
    STRUCT(ACCOUNT_ID:='ACC996', BALANCE:=996.00)
    ]);

SELECT CUSTOMER_NAME, EXPLODE(BALANCES) AS BALANCE FROM CUSTOMER_STATS EMIT CHANGES;

SELECT CUSTOMER_NAME, EXPLODE(BALANCES)->ACCOUNT_ID, EXPLODE(BALANCES)->BALANCE FROM CUSTOMER_STATS EMIT CHANGES;


CREATE TABLE CUSTOMER_TOTAL_BALANCE WITH (KAFKA_TOPIC='CUSTOMER_TOTAL_BALANCES', VALUE_FORMAT='AVRO') AS SELECT
CUSTOMER_ID, LATEST_BY_OFFSET(CUSTOMER_NAME) AS CUSTOMER_NAME, SUM(BALANCE) AS TOTAL_BALANCE FROM  CUSTOMER_BALANCES GROUP BY  CUSTOMER_ID EMIT CHANGES;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment