environment=env-0x3135
kafka_cluster=lkc-nv010v
confluent schema-registry cluster enable --cloud gcp --geo eu --environment env-0x3135
+--------------+-----------------------------------------------------+
| Id | lsrc-k802op |
| Endpoint URL | https://psrc-kk5gg.europe-west3.gcp.confluent.cloud |
+--------------+-----------------------------------------------------+
## Service Account
```shell
confluent iam service-account create siemensSA --description "Siemens Demo Service Account"
+-------------+------------------------------+
| ID | sa-1jn0v3 |
| Name | siemensSA |
| Description | Siemens Demo Service Account |
+-------------+------------------------------+
confluent api-key create --service-account sa-1jn0v3 --environment env-0x3135 --resource lkc-nv010v --description "SiemensSA key for Kafka Cluster"
It may take a couple of minutes for the API key to be ready.
Save the API key and secret. The secret is not retrievable later.
+---------+------------------------------------------------------------------+
| API Key | CKFNMATDK2HDX43B |
| Secret | voDoUuJ1FwElE6p3TB8qICNyFjoTROCHMWkJZQ2voCEOE10EeVTrSPyWZhafzxl5 |
+---------+------------------------------------------------------------------+
confluent ksql cluster create siemens_ksqldb_demo --csu 2 --environment env-0x3135 --cluster lkc-nv010v --api-key CKFNMATDK2HDX43B --api-secret voDoUuJ1FwElE6p3TB8qICNyFjoTROCHMWkJZQ2voCEOE10EeVTrSPyWZhafzxl5
+--------------+-----------------------------------------------------------+
| Id | lksqlc-120kk5 |
| Name | siemens_ksqldb_demo |
| Topic Prefix | pksqlc-75wq1 |
| Kafka | lkc-nv010v |
| Storage | 250 |
| Endpoint | https://pksqlc-75wq1.europe-west3.gcp.confluent.cloud:443 |
| Status | PROVISIONING |
+--------------+-----------------------------------------------------------+
confluent ksql cluster configure-acls lksqlc-120kk5 --environment env-0x3135 --cluster lkc-nv010v
confluent kafka acl create --service-account sa-1jn0v3 --cluster lkc-nv010v --allow --operation read --operation write --operation create --operation delete --topic "*"
- Create a STREAM and see how it translates to a topic
CREATE STREAM users_demo (
id INTEGER KEY,
gender STRING,
name STRING,
age INTEGER
) WITH (
kafka_topic='users_demo',
partitions=1,
value_format='JSON_SR');
- Insert some data
INSERT INTO users_demo (id, gender, name, age) VALUES (0, 'female', 'sarah', 42);
INSERT INTO users_demo (id, gender, name, age) VALUES (1, 'male', 'john', 28);
INSERT INTO users_demo (id, gender, name, age) VALUES (42, 'female', 'jessica', 70);
- Check the content of the STREAM
## REMEMBER TO SET THE `auto.offset.reset` to `Earliest`
select * from USERS_DEMO EMIT CHANGES;
- Check the content of the TOPIC
- Check SCHEMA-REGISTRY
{
"name": "UserDataGenerator",
"config": {
"connector.class": "DatagenSource",
"name": "UserDaraGenerator",
"kafka.auth.mode": "KAFKA_API_KEY",
"kafka.api.key": "CKFNMATDK2HDX43B",
"kafka.api.secret": "voDoUuJ1FwElE6p3TB8qICNyFjoTROCHMWkJZQ2voCEOE10EeVTrSPyWZhafzxl5",
"kafka.topic": "users_data",
"output.data.format": "JSON_SR",
"quickstart": "USERS_ARRAY",
"max.interval": "5000",
"tasks.max": "1"
}
}
CREATE STREAM users_data WITH (KAFKA_TOPIC='users_data', VALUE_FORMAT='JSON_SR');
{ "REGISTERTIME": 1508381460550, "USERID": "User_4", "REGIONID": "Region_3", "GENDER": "MALE", "INTERESTS": [ "News", "Movies" ], "CONTACTINFO": { "zipcode": "95112", "state": "CA", "phone": "4083366881", "city": "San Jose" } }
SELECT *
FROM USERS_DATA
WHERE USERID='User_2'
EMIT CHANGES;
CREATE STREAM USERS_DATA_FLAT WITH (KAFKA_TOPIC='users_data_flat', VALUE_FORMAT='JSON_SR') AS
SELECT USERID,
GENDER,
CONTACTINFO['phone'] as PHONE,
CONTACTINFO['city'] as CITY,
CASE WHEN CONTACTINFO['city']='San Carlos' THEN 'NORTH'
WHEN CONTACTINFO['city']='San Mateo' THEN 'SOUTH'
WHEN CONTACTINFO['city']='Palo Alto' THEN 'EAST'
WHEN CONTACTINFO['city']='Irvine' THEN 'WEST'
ELSE 'CENTER' END AS REGION
FROM USERS_DATA
EMIT CHANGES;
SELECT USERID, COUNT(*) AS COUNT
FROM USERS_DATA_FLAT
GROUP BY USERID
EMIT CHANGES;
SELECT USERID,
LATEST_BY_OFFSET(GENDER) AS GENDER,
LATEST_BY_OFFSET(PHONE) AS PHONE,
LATEST_BY_OFFSET(CITY) AS CITY,
LATEST_BY_OFFSET(REGION) AS REGION
FROM USERS_DATA_FLAT
GROUP BY (USERID)
EMIT CHANGES;
CREATE TABLE USERS_DATA_TBL WITH (KAFKA_TOPIC='users_data_tbl', VALUE_FORMAT='JSON_SR') AS
SELECT USERID,
LATEST_BY_OFFSET(GENDER) AS GENDER,
LATEST_BY_OFFSET(PHONE) AS PHONE,
LATEST_BY_OFFSET(CITY) AS CITY,
LATEST_BY_OFFSET(REGION) AS REGION
FROM USERS_DATA_FLAT
GROUP BY (USERID)
EMIT CHANGES;
## THIS DOES NOT WORK
CREATE SOURCE TABLE USERS_DATA_SRC_TBL (
USERID string PRIMARY KEY,
GENDER string, PHONE string,
CITY string, REGION string)
WITH (kafka_topic='users_data_flat', value_format='JSON_SR', value_schema_id=100020);
{
"name": "StockTradesDataGenerator",
"config": {
"connector.class": "DatagenSource",
"name": "DatagenSourceConnector_1",
"kafka.auth.mode": "KAFKA_API_KEY",
"kafka.api.key": "CKFNMATDK2HDX43B",
"kafka.api.secret": "voDoUuJ1FwElE6p3TB8qICNyFjoTROCHMWkJZQ2voCEOE10EeVTrSPyWZhafzxl5",
"kafka.topic": "stock_trades",
"output.data.format": "JSON_SR",
"quickstart": "STOCK_TRADES",
"max.interval": "3000",
"tasks.max": "1"
}
}
CREATE STREAM stock_trades WITH (KAFKA_TOPIC='stock_trades', VALUE_FORMAT='JSON_SR');
SELECT USERS.USERID AS USERID,
TRADES.SIDE,
TRADES.SYMBOL,
TRADES.QUANTITY,
TRADES.PRICE,
(TRADES.QUANTITY * TRADES.PRICE) AS TOTAL,
USERS.REGION,
USERS.PHONE
FROM STOCK_TRADES AS TRADES
LEFT JOIN USERS_DATA_TBL AS USERS ON TRADES.USERID = USERS.USERID EMIT CHANGES;
CREATE STREAM USER_TRADES WITH (KAFKA_TOPIC='user_trades', VALUE_FORMAT='JSON_SR', PARTITIONS=3) AS
SELECT TRADES.USERID AS USERID,
TRADES.SIDE,
TRADES.SYMBOL,
TRADES.QUANTITY,
TRADES.PRICE,
CASE WHEN TRADES.SIDE='BUY' THEN (TRADES.QUANTITY * TRADES.PRICE * -1)
WHEN TRADES.SIDE='SELL' THEN (TRADES.QUANTITY * TRADES.PRICE)
END AS TOTAL,
USERS.REGION,
USERS.PHONE
FROM STOCK_TRADES AS TRADES
LEFT JOIN USERS_DATA_TBL AS USERS ON TRADES.USERID = USERS.USERID
PARTITION BY TRADES.USERID
EMIT CHANGES;
SELECT USERID,
SYMBOL,
SUM(TOTAL),
LATEST_BY_OFFSET(REGION) AS REGION,
LATEST_BY_OFFSET(PHONE) AS PHONE
FROM USER_TRADES
GROUP BY USERID, SYMBOL
EMIT CHANGES;
CREATE TABLE USER_TRADES_TOTAL WITH (KAFKA_TOPIC='users_trade_total_tbl', VALUE_FORMAT='JSON_SR', KEY_FORMAT='JSON_SR') AS
SELECT USERID,
SYMBOL,
SUM(TOTAL) AS TOTAL,
LATEST_BY_OFFSET(REGION) AS REGION,
LATEST_BY_OFFSET(PHONE) AS PHONE
FROM USER_TRADES
GROUP BY USERID, SYMBOL
EMIT CHANGES;
SELECT USERID, SUM(TOTAL) AS TOTAL FROM USER_TRADES_TOTAL GROUP BY USERID EMIT CHANGES;
https://pksqlc-gw90v.europe-west3.gcp.confluent.cloud:443/ksql
#auth
Basic
#body
{
"ksql": "show streams;",
"streamProperties":{}
}
#body
{
"ksql": "show tables;",
"streamProperties":{}
}
#body - insert
{
"ksql": "INSERT INTO MOVEMENTS VALUES('Allison', 'Loveland');",
"streamProperties":{}
}
https://pksqlc-gw90v.europe-west3.gcp.confluent.cloud:443/query
#body - query
{
"ksql": "SELECT * FROM PERSON_STATS;",
"streamProperties":{}
}
#body - query
{
"ksql": "SELECT * FROM PERSON_STATS WHERE person ='Allison';",
"streamProperties":{}
}
CREATE TYPE BALANCE_TYPE AS STRUCT<account_id STRING, balance DOUBLE>;
CREATE STREAM CUSTOMER_STATS (
CUSTOMER_ID STRING KEY,
CUSTOMER_NAME STRING,
BALANCES ARRAY<BALANCE_TYPE>
) WITH (
KAFKA_TOPIC='CUSTOMER_STATS',
PARTITIONS=1,
VALUE_FORMAT='JSON_SR');
INSERT INTO CUSTOMER_STATS (CUSTOMER_ID, CUSTOMER_NAME, BALANCES) VALUES (
'DF1234', 'DENNIS', ARRAY[
STRUCT(ACCOUNT_ID:='ACC123', BALANCE:=123.45),
STRUCT(ACCOUNT_ID:='ACC456', BALANCE:=567.86),
STRUCT(ACCOUNT_ID:='ACC996', BALANCE:=996.00)
]);
SELECT CUSTOMER_NAME, EXPLODE(BALANCES) AS BALANCE FROM CUSTOMER_STATS EMIT CHANGES;
SELECT CUSTOMER_NAME, EXPLODE(BALANCES)->ACCOUNT_ID, EXPLODE(BALANCES)->BALANCE FROM CUSTOMER_STATS EMIT CHANGES;
CREATE TABLE CUSTOMER_TOTAL_BALANCE WITH (KAFKA_TOPIC='CUSTOMER_TOTAL_BALANCES', VALUE_FORMAT='AVRO') AS SELECT
CUSTOMER_ID, LATEST_BY_OFFSET(CUSTOMER_NAME) AS CUSTOMER_NAME, SUM(BALANCE) AS TOTAL_BALANCE FROM CUSTOMER_BALANCES GROUP BY CUSTOMER_ID EMIT CHANGES;