Live Coding a KSQL Application
Introduction
This code file accompanies the slides and recording available online at : https://www.confluent.io/online-talks/
Don’t forget to check out the #ksql channel on our Community Slack group
——Robin Moffatt @rmoff, 28 Feb 2018
Pre-demo
Built using Confluent Platform 4.0 and KSQL 0.5.
Start Confluent Platform
confluent destroy
confluent start
Robin@asgard02 ~/c/confluent-4.0.0> confluent start
Writing temporary config and data files to /var/folders/q9/2tg_lt9j6nx29rvr5r5jn_bw0000gp/T/
Starting zookeeper
zookeeper is [UP]
Starting kafka
kafka is [UP]
Starting schema-registry
schema-registry is [UP]
Starting kafka-rest
kafka-rest is [UP]
Starting connect
connect is [UP]
Robin@asgard02 ~/c/confluent-4.0.0>
Populate Users topic
kafkacat -P -b localhost:9092 -t mysql_users -K ":"
Paste the data to stdin
:
1:{"uid":1,"name":"Cliff","locale":"en_US","address_city":"St Louis","elite":"P"} 2:{"uid":2,"name":"Nick","locale":"en_US","address_city":"Palo Alto","elite":"G"} 3:{"uid":3,"name":"Jeremy","locale":"en_US","address_city":"Austin","elite":"P"} 4:{"uid":4,"name":"Michael","locale":"en_US","address_city":"Geneva","elite":"G"} 5:{"uid":5,"name":"Neil","locale":"en_US","address_city":"London","elite":"G"} 6:{"uid":6,"name":"Hojjat","locale":"en_US","address_city":"Palo Alto","elite":"S"} 7:{"uid":7,"name":"Damian","locale":"en_US","address_city":"London","elite":"S"} 8:{"uid":8,"name":"Apurva","locale":"en_US","address_city":"Palo Alto","elite":"S"} 9:{"uid":9,"name":"Neha","locale":"en_US","address_city":"Palo Alto","elite":"P"} 10:{"uid":10,"name":"Mattias","locale":"DE","address_city":"Palo Alto","elite":"S"} 11:{"uid":11,"name":"Gwen","locale":"en_US","address_city":"Palo Alto","elite":"P"} 12:{"uid":12,"name":"Guozhang","locale":"en_US","address_city":"Palo Alto","elite":"S"} 13:{"uid":13,"name":"Lu","locale":"en_US","address_city":"Palo Alto","elite":"P"} 14:{"uid":14,"name":"Robin","locale":"en_US","address_city":"Ilkley","elite":"G"} 2:{"uid":2,"name":"Nick","locale":"en_US","address_city":"San Francisco","elite":"G"} 2:{"uid":2,"name":"Nick","locale":"en_US","address_city":"Palo Alto","elite":"P"}
Exit: Ctrl-D
Inspect Users topic data
kafkacat -C -K: -b localhost:9092 -f 'Key: %k\nValue: %s\n\n' -t mysql_users
kafkacat -C -K: -b localhost:9092 -f 'Key: %k\nValue: %s\n\n' -t mysql_users
[...]
Key: 4
Value: {"uid":4,"name":"Michael","locale":"en_US","address_city":"Geneva","elite":"G"}
Key: 5
Value: {"uid":5,"name":"Neil","locale":"en_US","address_city":"London","elite":"G"}
[...]
Demo
Note
|
Create blank window in |
Check Confluent is running
confluent status
Robin@asgard02 ~/c/confluent-4.0.0> confluent status
Writing temporary config and data files to /var/folders/q9/2tg_lt9j6nx29rvr5r5jn_bw0000gp/T/
connect is [UP]
kafka-rest is [UP]
schema-registry is [UP]
kafka is [UP]
zookeeper is [UP]
Run datagen - ratings
~/git/ksql/bin/ksql-datagen quickstart=ratings format=avro topic=ratings maxInterval=500
Robin@asgard02 ~/c/confluent-4.0.0> ~/git/ksql/bin/ksql-datagen quickstart=ratings format=avro topic=ratings maxInterval=500
1 --> ([ 1 | 10 | 3 | 9297 | 1519302621843 | 'iOS' | 'your team here rocks!' ])
2 --> ([ 2 | 10 | 1 | 3415 | 1519302622767 | 'android' | 'Surprisingly good, maybe you are getting your mojo back at long last!' ])
3 --> ([ 3 | 17 | 2 | 5197 | 1519302623090 | 'web' | 'meh' ])
4 --> ([ 4 | 9 | 3 | 9389 | 1519302623414 | 'android' | 'is this as good as it gets? really ?' ])
Note
|
Name |
Start KSQL Server
~/git/ksql/bin/ksql-server-start ~/git/ksql/ksqlserver.properties
$ ~/git/ksql/bin/ksql-server-start ~/git/ksql/ksqlserver.properties
===========================================
= _ __ _____ ____ _ =
= | |/ // ____|/ __ \| | =
= | ' /| (___ | | | | | =
= | < \___ \| | | | | =
= | . \ ____) | |__| | |____ =
= |_|\_\_____/ \___\_\______| =
= =
= Streaming SQL Engine for Apache Kafka® =
===========================================
Copyright 2018 Confluent Inc.
Server 0.5 listening on http://localhost:8090
To access the KSQL CLI, run:
ksql-cli remote http://localhost:8090
To access the UI, point your browser at:
http://localhost:8090/index.html
Note
|
Name |
Launch KSQL CLI
Note
|
New |
Connect to KSQL server running locally
~/git/ksql/bin/ksql-cli remote http://localhost:8090/
Note
|
Name |
Robin@asgard02 ~> ~/git/ksql/bin/ksql-cli remote http://localhost:8090/
===========================================
= _ __ _____ ____ _ =
= | |/ // ____|/ __ \| | =
= | ' /| (___ | | | | | =
= | < \___ \| | | | | =
= | . \ ____) | |__| | |____ =
= |_|\_\_____/ \___\_\______| =
= =
= Streaming SQL Engine for Apache Kafka® =
===========================================
Copyright 2018 Confluent Inc.
CLI v0.5, Server v0.5 located at http://localhost:8090/
Having trouble? Type 'help' (case-insensitive) for a rundown of how things work!
ksql>
See available Kafka topics
show topics;
ksql> show topics;
Kafka Topic | Registered | Partitions | Partition Replicas | Consumers | Consumer Groups
-----------------------------------------------------------------------------------------------
_schemas | false | 1 | 1 | 0 | 0
connect-configs | false | 1 | 1 | 0 | 0
connect-offsets | false | 25 | 1 | 0 | 0
connect-statuses | false | 5 | 1 | 0 | 0
ksql__commands | true | 1 | 1 | 0 | 0
ratings | false | 1 | 1 | 0 | 0
users | false | 1 | 1 | 0 | 0
-----------------------------------------------------------------------------------------------
Inspect a topic contents - Ratings
Tip
|
Don’t need to know the format of the data. Can see column names and values. |
PRINT 'ratings';
Explain TS/Key/Message concept
ksql> PRINT 'ratings';
Format:AVRO
22/02/18 12:55:04 GMT, 5312, {"rating_id": 5312, "user_id": 4, "stars": 4, "route_id": 2440, "rating_time": 1519304104965, "channel": "web", "message": "Surprisingly good, maybe you are getting your mojo back at long last!"}
22/02/18 12:55:05 GMT, 5313, {"rating_id": 5313, "user_id": 3, "stars": 4, "route_id": 6975, "rating_time": 1519304105213, "channel": "web", "message": "why is it so difficult to keep the bathrooms clean ?"}
Inspect a topic contents - Users
Don’t need to know the format of the data. Can see column names and values.
PRINT 'mysql_users' FROM BEGINNING;
Tell KSQL to process from beginning of topic
Process from beginning of topic
SET 'auto.offset.reset' = 'earliest';
ksql> SET 'auto.offset.reset' = 'earliest';
Successfully changed local property 'auto.offset.reset' from 'null' to 'earliest'
Register Ratings topic for querying
CREATE STREAM ratings WITH (KAFKA_TOPIC='ratings', VALUE_FORMAT='AVRO');
Why’s it a stream? Because it’s a continuous stream of events
ksql> CREATE STREAM ratings WITH (KAFKA_TOPIC='ratings', VALUE_FORMAT='AVRO');
Message
---------------
Table created
---------------
Describe ratings stream
DESCRIBE ratings;
Note :
-
System columns for timestamp and key
-
All the other columns have been picked up automagically - have not had to specify them
ksql> DESCRIBE ratings;
Field | Type
-----------------------------------------
ROWTIME | BIGINT (system)
ROWKEY | VARCHAR(STRING) (system)
RATING_ID | BIGINT
USER_ID | INTEGER
STARS | INTEGER
ROUTE_ID | INTEGER
RATING_TIME | BIGINT
CHANNEL | VARCHAR(STRING)
MESSAGE | VARCHAR(STRING)
-----------------------------------------
For runtime statistics and query details run: DESCRIBE EXTENDED <Stream,Table>;
Query ratings stream
SELECT * FROM ratings;
This is a continuous query!
ksql> SELECT * FROM ratings;
1519402268942 | 1 | 1 | 13 | 1 | 3700 | 1519402267832 | ios | airport refurb looks great, will fly outta here more!
1519402269200 | 2 | 2 | 12 | 2 | 9907 | 1519402269200 | android | (expletive deleted)
1519402269694 | 3 | 3 | 2 | 1 | 5421 | 1519402269694 | android | is this as good as it gets? really ?
1519402269857 | 4 | 4 | 18 | 2 | 1462 | 1519402269856 | android | your team here rocks!
Cancel the datagen task - note that the query stops.
Restart the datagen task - query now continues to return data
Filter the ratings stream
SELECT * FROM ratings WHERE STARS <3 AND CHANNEL='iOS' LIMIT 5;
Note the use of LIMIT
so that we just see a sample of the stream of data
ksql> SELECT * FROM ratings WHERE STARS <3 AND CHANNEL='iOS' LIMIT 5;
1519402272247 | 13 | 13 | 9 | 1 | 2545 | 1519402272247 | iOS | more peanuts please
1519402272750 | 14 | 14 | 0 | 2 | 4419 | 1519402272749 | iOS | airport refurb looks great, will fly outta here more!
1519402273755 | 18 | 18 | 15 | 1 | 5306 | 1519402273755 | iOS | Surprisingly good, maybe you are getting your mojo back at long last!
1519402278686 | 37 | 37 | 17 | 1 | 725 | 1519402278686 | iOS | meh
1519402279186 | 39 | 39 | 10 | 1 | 6304 | 1519402279186 | iOS | (expletive deleted)
LIMIT reached for the partition.
Query terminated
ksql>
Persist a filtered stream
Create the stream
Let’s take the poor ratings from people with iOS devices, and create a new stream from them!
CREATE STREAM POOR_RATINGS AS SELECT * FROM ratings WHERE STARS <3 AND CHANNEL='iOS';
ksql> CREATE STREAM POOR_RATINGS AS SELECT * FROM ratings WHERE STARS <3 AND CHANNEL='iOS';
Message
----------------------------
Stream created and running
----------------------------
Inspect the stream
DESCRIBE POOR_RATINGS;
ksql> DESCRIBE POOR_RATINGS;
Field | Type
-----------------------------------------
ROWTIME | BIGINT (system)
ROWKEY | VARCHAR(STRING) (system)
RATING_ID | BIGINT
USER_ID | INTEGER
STARS | INTEGER
ROUTE_ID | INTEGER
RATING_TIME | BIGINT
CHANNEL | VARCHAR(STRING)
MESSAGE | VARCHAR(STRING)
-----------------------------------------
For runtime statistics and query details run: DESCRIBE EXTENDED <Stream,Table>;
ksql>
Inspect the stream further
DESCRIBE EXTENDED POOR_RATINGS;
ksql> DESCRIBE EXTENDED POOR_RATINGS;
Type : STREAM
Key field :
Timestamp field : Not set - using <ROWTIME>
Key format : STRING
Value format : AVRO
Kafka output topic : POOR_RATINGS (partitions: 4, replication: 1)
Field | Type
-----------------------------------------
ROWTIME | BIGINT (system)
ROWKEY | VARCHAR(STRING) (system)
RATING_ID | BIGINT
USER_ID | INTEGER
STARS | INTEGER
ROUTE_ID | INTEGER
RATING_TIME | BIGINT
CHANNEL | VARCHAR(STRING)
MESSAGE | VARCHAR(STRING)
-----------------------------------------
Queries that write into this STREAM
-----------------------------------
id:CSAS_POOR_RATINGS - CREATE STREAM POOR_RATINGS AS SELECT * FROM ratings WHERE STARS <3 AND CHANNEL='iOS';
For query topology and execution plan please run: EXPLAIN <QueryId>
Local runtime statistics
------------------------
messages-per-sec: 0.33 total-messages: 33 last-message: 27/02/18 11:28:58 GMT
failed-messages: 0 failed-messages-per-sec: 0 last-failed: n/a
(Statistics of the local KSQL server interaction with the Kafka topic POOR_RATINGS)
ksql>
Query the stream
SELECT * FROM POOR_RATINGS;
ksql> SELECT * FROM POOR_RATINGS;
1519730883770 | 1608 | 1608 | 18 | 2 | 829 | 1519730883770 | iOS | your team here rocks!
1519730881132 | 1596 | 1596 | 6 | 2 | 3185 | 1519730881132 | iOS | thank you for the most friendly, helpful experience today at your new lounge
1519730886192 | 1615 | 1615 | 19 | 1 | 9409 | 1519730886192 | iOS | more peanuts please
1519730880186 | 1591 | 1591 | 7 | 1 | 9830 | 1519730880186 | iOS | is this as good as it gets? really ?
1519730894709 | 1655 | 1655 | 7 | 2 | 6036 | 1519730894709 | iOS | meh
See the Kafka Topic
From the command line, use the standard Kafka tools to interact with the new stream — it’s just a Kafka topic!
kafka-topics --zookeeper localhost:2181 --list
Robin@asgard02 ~> kafka-topics --zookeeper localhost:2181 --list
POOR_RATINGS
__consumer_offsets
_schemas
connect-configs
connect-offsets
connect-statuses
ksql__commands
mysql_users
ratings
Inspect the Kafka topic’s data
kafka-avro-console-consumer \
--bootstrap-server localhost:9092 \
--property schema.registry.url=http://localhost:8081 \
--topic POOR_RATINGS --from-beginning | jq '.'
Robin@asgard02 ~> kafka-avro-console-consumer \
--bootstrap-server localhost:9092 \
--property schema.registry.url=http://localhost:8081 \
--topic POOR_RATINGS --from-beginning | jq '.'
{
"RATING_ID": {
"long": 1615
},
"USER_ID": {
"int": 19
},
"STARS": {
"int": 1
},
"ROUTE_ID": {
"int": 9409
},
"RATING_TIME": {
"long": 1519730886192
},
"CHANNEL": {
"string": "iOS"
},
"MESSAGE": {
"string": "more peanuts please"
}
}
Joining Data in KSQL
Remember our Users data? Let’s bring that into play, and use it to enrich the inbound stream of ratings data.
Inspect Users Data
Let’s check the data first, using the very handy PRINT
command:
PRINT 'mysql_users' FROM BEGINNING;
ksql> PRINT 'mysql_users' FROM BEGINNING;
Format:JSON
{"ROWTIME":1519640382207,"ROWKEY":"1","uid":1,"name":"Cliff","locale":"en_US","address_city":"St Louis","elite":"P"}
{"ROWTIME":1519640382207,"ROWKEY":"2","uid":2,"name":"Nick","locale":"en_US","address_city":"Palo Alto","elite":"G"}
{"ROWTIME":1519640382207,"ROWKEY":"3","uid":3,"name":"Jeremy","locale":"en_US","address_city":"Austin","elite":"P"}
Create Users Table
Now, create a TABLE
over the Kafka topic. Why’s it a table? Because for each key (user id), we want to know its value (name, address, etc)
CREATE TABLE users (uid INT, name VARCHAR, locale VARCHAR, address_city VARCHAR, elite VARCHAR) WITH (KAFKA_TOPIC='mysql_users', VALUE_FORMAT='JSON', KEY='uid');
Note that we’ve specified the KEY
here, which must match the key of the Kafka message too.
ksql> CREATE TABLE users (uid INT, name VARCHAR, locale VARCHAR, address_city VARCHAR, elite VARCHAR) WITH (KAFKA_TOPIC='mysql_users', VALUE_FORMAT='JSON', KEY='uid'); Message --------------- Table created --------------- ksql>
Stream-Table join (1)
Now let’s join our ratings data, which includes user ID, to our user information:
Basics to start with — rating message plus the user’s name.
Couple of things to note: * We’re aliasing the table and stream names to make column names unambiguous * I’m using the backspace line continuation character
SELECT R.MESSAGE, U.NAME \
FROM RATINGS R LEFT JOIN USERS U \
ON R.USER_ID = U.UID \
LIMIT 5;
ksql> SELECT R.MESSAGE, U.NAME \
> FROM RATINGS R LEFT JOIN USERS U \
> ON R.USER_ID = U.UID \
> LIMIT 5;
airport refurb looks great, will fly outta here more! | Damian
Exceeded all my expectations. Thank you ! | Neil
(expletive deleted) | Lu
why is it so difficult to keep the bathrooms clean ? | Apurva
thank you for the most friendly, helpful experience today at your new lounge | Cliff
LIMIT reached for the partition.
Query terminated
ksql>
Stream-Table join (2)
Now let’s pull the full set of data, including a reformat of the timestamp into something human readable:
Note the IS NOT NULL
clause to filter out any ratings with no corresponding user data
SELECT R.RATING_ID, R.STARS, R.ROUTE_ID, TIMESTAMPTOSTRING(R.RATING_TIME, 'yyyy-MM-dd HH:mm:ss'), R.CHANNEL, \
R.MESSAGE, U.NAME, U.ADDRESS_CITY, U.ELITE \
FROM RATINGS R LEFT JOIN USERS U \
ON R.USER_ID = U.UID WHERE U.NAME IS NOT NULL;
ksql> SELECT R.RATING_ID, R.STARS, R.ROUTE_ID, TIMESTAMPTOSTRING(R.RATING_TIME, 'yyyy-MM-dd HH:mm:ss'), R.CHANNEL, \
> R.MESSAGE, U.NAME, U.ADDRESS_CITY, U.ELITE \
> FROM RATINGS R LEFT JOIN USERS U \
> ON R.USER_ID = U.UID WHERE U.NAME IS NOT NULL;
18196 | 1 | 1790 | 2018-02-26 12:42:38 | iOS-test | Surprisingly good, maybe you are getting your mojo back at long last! | Robin | Ilkley | G
18197 | 4 | 2862 | 2018-02-26 12:42:38 | iOS-test | your team here rocks! | Jeremy | Austin | P
18198 | 1 | 6954 | 2018-02-26 12:42:39 | ios | Exceeded all my expectations. Thank you ! | Nick | Palo Alto | P
18199 | 1 | 2092 | 2018-02-26 12:42:39 | ios | (expletive deleted) | Neil | London | G
18200 | 2 | 6042 | 2018-02-26 12:42:39 | ios | more peanuts please | Neil | London | G
18202 | 2 | 1133 | 2018-02-26 12:42:39 | iOS-test | thank you for the most friendly, helpful experience today at your new lounge | Neha | Palo Alto | P
18205 | 3 | 4086 | 2018-02-26 12:42:40 | ios | meh | Neil | London | G
18206 | 1 | 9217 | 2018-02-26 12:42:41 | web | thank you for the most friendly, helpful experience today at your new lounge | Nick | Palo Alto | P
18207 | 3 | 2655 | 2018-02-26 12:42:41 | android | more peanuts please | Neha | Palo Alto | P
18209 | 3 | 2086 | 2018-02-26 12:42:42 | iOS-test | meh | Lu | Palo Alto | P
18210 | 2 | 1629 | 2018-02-26 12:42:42 | web | airport refurb looks great, will fly outta here more! | Neha | Palo Alto | P
^CQuery terminated
Stream-Table join (3)
Let’s persist this as an enriched stream:
CREATE STREAM RATINGS_FULL AS \
SELECT R.RATING_ID, R.STARS, R.ROUTE_ID, TIMESTAMPTOSTRING(R.RATING_TIME, 'yyyy-MM-dd HH:mm:ss') AS RATING_TIME, \
R.CHANNEL, R.MESSAGE, U.NAME, U.ADDRESS_CITY, U.ELITE \
FROM RATINGS R LEFT JOIN USERS U \
ON R.USER_ID = U.UID WHERE U.NAME IS NOT NULL;
ksql> CREATE STREAM RATINGS_FULL AS \
> SELECT R.RATING_ID, R.STARS, R.ROUTE_ID, TIMESTAMPTOSTRING(R.RATING_TIME, 'yyyy-MM-dd HH:mm:ss') AS RATING_TIME, \
> R.CHANNEL, R.MESSAGE, U.NAME, U.ADDRESS_CITY, U.ELITE \
> FROM RATINGS R LEFT JOIN USERS U \
> ON R.USER_ID = U.UID WHERE U.NAME IS NOT NULL;
Message
----------------------------
Stream created and running
----------------------------
ksql>
Filtering an enriched stream
Which of our Premier customers are not happy?
SELECT * FROM RATINGS_FULL WHERE ELITE='P' AND STARS <3;
ksql> SELECT * FROM RATINGS_FULL WHERE ELITE='P' AND STARS <3;
1519649077925 | 2 | 18684 | 1 | 3895 | 2018-02-26 12:44:37 | ios | thank you for the most friendly, helpful experience today at your new lounge | Nick | Palo Alto | P
1519649078280 | 13 | 18685 | 1 | 4088 | 2018-02-26 12:44:38 | iOS | more peanuts please | Lu | Palo Alto | P
1519649079664 | 1 | 18691 | 2 | 8933 | 2018-02-26 12:44:39 | iOS-test | more peanuts please | Cliff | St Louis | P
1519649084065 | 13 | 18709 | 1 | 4366 | 2018-02-26 12:44:44 | iOS-test | airport refurb looks great, will fly outta here more! | Lu | Palo Alto | P
1519649084603 | 3 | 18712 | 2 | 744 | 2018-02-26 12:44:44 | ios | worst. flight. ever. #neveragain | Jeremy | Austin | P
^CQuery terminated
Persist the filtered & enriched stream
CREATE STREAM UNHAPPY_VIPS AS SELECT * FROM RATINGS_FULL WHERE ELITE='P' AND STARS <3;
ksql> CREATE STREAM UNHAPPY_VIPS AS SELECT * FROM RATINGS_FULL WHERE ELITE='P' AND STARS <3;
Message
----------------------------
Stream created and running
----------------------------
ksql>
Query the new stream
SELECT STARS, MESSAGE, NAME FROM UNHAPPY_VIPS;
ksql> SELECT STARS, MESSAGE, NAME FROM UNHAPPY_VIPS;
1 | why is it so difficult to keep the bathrooms clean ? | Nick
1 | is this as good as it gets? really ? | Jeremy
1 | thank you for the most friendly, helpful experience today at your new lounge | Jeremy
1 | your team here rocks! | Lu
1 | thank you for the most friendly, helpful experience today at your new lounge | Lu
1 | why is it so difficult to keep the bathrooms clean ? | Jeremy
View the underlying topic data
kafka-avro-console-consumer \
--bootstrap-server localhost:9092 \
--property schema.registry.url=http://localhost:8081 \
--topic UNHAPPY_VIPS --from-beginning
Robin@asgard02 ~> kafka-avro-console-consumer \
--bootstrap-server localhost:9092 \
--property schema.registry.url=http://localhost:8081 \
--topic UNHAPPY_VIPS --from-beginning
{"RATING_ID":{"long":25},"STARS":{"int":1},"ROUTE_ID":{"int":1548},"KSQL_COL_3":{"string":"2018-02-27 11:21:21"},"CHANNEL":{"string":"android"},"MESSAGE":{"string":"thank you for the most friendly, helpful experience today at your new lounge"},"NAME":{"string":"Neha"},"ADDRESS_CITY":{"string":"Palo Alto"},"ELITE":{"string":"P"}}
{"RATING_ID":{"long":31},"STARS":{"int":2},"ROUTE_ID":{"int":451},"KSQL_COL_3":{"string":"2018-02-27 11:21:22"},"CHANNEL":{"string":"android"},"MESSAGE":{"string":"Exceeded all my expectations. Thank you !"},"NAME":{"string":"Cliff"},"ADDRESS_CITY":{"string":"St Louis"},"ELITE":{"string":"P"}}
Streaming Aggregates
Explain windowing
-
Tumbling (e.g. every 5 minutes : 00:00, 00:05, 00:10)
-
Hopping (e.g. every 5 minutes, advancing 1 minute: 00:00-00:05, 00:01-00:06)
-
Session (Sets a timeout for the given key, after which any new data is treated as a new session)
Running Count per Minute
Show count of ratings per City, per minute
SELECT ADDRESS_CITY, COUNT(*) AS RATING_COUNT \
FROM RATINGS_FULL WINDOW TUMBLING (SIZE 1 MINUTES) \
GROUP BY ADDRESS_CITY;
ksql> SELECT ADDRESS_CITY, COUNT(*) AS RATING_COUNT \
> FROM RATINGS_FULL WINDOW TUMBLING (SIZE 1 MINUTES) \
> GROUP BY ADDRESS_CITY;
Geneva | 11
Geneva | 11
Geneva | 14
Geneva | 10
Geneva | 19
CREATE TABLE
Let’s persist that into a TABLE. So far we’ve only worked with a STREAM. A table gives the state of a given key at a given point in time. So here, for each city, at each minute window, what’s the total count
CREATE TABLE RATINGS_BY_CITY AS \
SELECT ADDRESS_CITY, COUNT(*) AS RATING_COUNT \
FROM RATINGS_FULL WINDOW TUMBLING (SIZE 1 MINUTES) \
GROUP BY ADDRESS_CITY;
ksql> CREATE TABLE RATINGS_BY_CITY AS \
> SELECT ADDRESS_CITY, COUNT(*) AS RATING_COUNT \
> FROM RATINGS_FULL WINDOW TUMBLING (SIZE 1 MINUTES) \
> GROUP BY ADDRESS_CITY;
Message
---------------------------
Table created and running
---------------------------
ksql>
Examine the created table’s columns
DESCRIBE RATINGS_BY_CITY;
Note that the city is denoted as (key)
ksql> DESCRIBE RATINGS_BY_CITY;
Field | Type
------------------------------------------
ROWTIME | BIGINT (system)
ROWKEY | VARCHAR(STRING) (system)
ADDRESS_CITY | VARCHAR(STRING) (key)
RATING_COUNT | BIGINT
------------------------------------------
For runtime statistics and query details run: DESCRIBE EXTENDED <Stream,Table>;
ksql>
Point out the system columns - ROWTIME
and ROWKEY
.
Examine the contents of the new table’s columns
SELECT ROWTIME, ROWKEY, ADDRESS_CITY, RATING_COUNT FROM RATINGS_BY_CITY LIMIT 5;
ksql> SELECT ROWTIME, ROWKEY, ADDRESS_CITY, RATING_COUNT FROM RATINGS_BY_CITY LIMIT 5;
1519730460000 | Geneva : Window{start=1519730460000 end=-} | Geneva | 11
1519730460000 | Ilkley : Window{start=1519730460000 end=-} | Ilkley | 11
1519730520000 | Geneva : Window{start=1519730520000 end=-} | Geneva | 11
1519730460000 | St Louis : Window{start=1519730460000 end=-} | St Louis | 9
1519730580000 | Geneva : Window{start=1519730580000 end=-} | Geneva | 14
Using Functions like TIMESTAMPTOSTRING
KSQL comes with a bunch of functions, both scalar and aggregate (like COUNT
which we saw previously).
Let’s convert the ROWTIME
epoch value to a more readable one:
SELECT TIMESTAMPTOSTRING(ROWTIME, 'yyyy-MM-dd HH:mm:ss') AS WINDOW_START_TS, ADDRESS_CITY, RATING_COUNT FROM RATINGS_BY_CITY;
ksql> SELECT TIMESTAMPTOSTRING(ROWTIME, 'yyyy-MM-dd HH:mm:ss') AS WINDOW_START_TS, ADDRESS_CITY, RATING_COUNT FROM RATINGS_BY_CITY;
2018-02-27 11:21:00 | London | 25
2018-02-27 11:21:00 | Palo Alto | 71
2018-02-27 11:22:00 | Palo Alto | 89
2018-02-27 11:22:00 | London | 24
2018-02-27 11:23:00 | London | 25
Filtering the aggregate table
This table is just a first class object in KSQL, that we can query and filter as any other:
SELECT TIMESTAMPTOSTRING(ROWTIME, 'yyyy-MM-dd HH:mm:ss') AS WINDOW_START_TS, ADDRESS_CITY, RATING_COUNT FROM RATINGS_BY_CITY WHERE ADDRESS_CITY='Ilkley';
ksql> SELECT TIMESTAMPTOSTRING(ROWTIME, 'yyyy-MM-dd HH:mm:ss') AS WINDOW_START_TS, ADDRESS_CITY, RATING_COUNT FROM RATINGS_BY_CITY WHERE ADDRESS_CITY='Ilkley';
2018-02-27 11:21:00 | Ilkley | 11
2018-02-27 11:22:00 | Ilkley | 6
2018-02-27 11:23:00 | Ilkley | 13
2018-02-27 11:24:00 | Ilkley | 14
2018-02-27 11:25:00 | Ilkley | 15
Note how aggregates update within the current window.
Check out the Kafka topic
Let’s step out of KSQL for a moment. When you create a STREAM or `TABLE
in KSQL, it is backed by a Kafka topic. Let’s check this out:
kafka-topics --zookeeper localhost:2181 --list|grep RATINGS
Robin@asgard02 ~/c/confluent-4.0.0> kafka-topics --zookeeper localhost:2181 --list|grep RATINGS
RATINGS_BY_CITY
ksql_query_CTAS_RATING_COUNT_BY_CHANNEL-KSQL_Agg_Query_1519407993703-changelog
ksql_query_CTAS_RATING_COUNT_BY_CHANNEL-KSQL_Agg_Query_1519407993703-repartition
[...]
And it’s just a Kafka topic, that we can consume from just as any other:
kafka-avro-console-consumer \
--bootstrap-server localhost:9092 \
--property schema.registry.url=http://localhost:8081 \
--topic RATINGS_BY_CITY --from-beginning -max-messages 5| jq '.'
Robin@asgard02 ~> kafka-avro-console-consumer \
--bootstrap-server localhost:9092 \
--property schema.registry.url=http://localhost:8081 \
--topic RATINGS_BY_CITY2 --from-beginning -max-messages 5| jq '.'
{
"ADDRESS_CITY": {
"string": "London"
},
"RATING_COUNT": {
"long": 25
}
}
{
"ADDRESS_CITY": {
"string": "Palo Alto"
},
"RATING_COUNT": {
"long": 71
}
}
Where’s the window? Well that’s a system column that’s part of the message key. If we wanted to expose it further, we could do a CREATE TABLE
based on that TIMESTAMPTOSTRING
function which exposes the system column
Exposing Aggregate Window Keys
CREATE TABLE RATINGS_BY_CITY_TS AS SELECT TIMESTAMPTOSTRING(ROWTIME, 'yyyy-MM-dd HH:mm:ss') AS WINDOW_START_TS, ROWKEY, ADDRESS_CITY, RATING_COUNT FROM RATINGS_BY_CITY;
ksql> CREATE TABLE RATINGS_BY_CITY_TS AS SELECT TIMESTAMPTOSTRING(ROWTIME, 'yyyy-MM-dd HH:mm:ss') AS WINDOW_START_TS, ROWKEY, ADDRESS_CITY, RATING_COUNT FROM RATINGS_BY_CITY;
Message
---------------------------
Table created and running
---------------------------
ksql>
Examine table with Timestamp exposed
DESCRIBE RATINGS_BY_CITY_TS;
ksql> describe RATINGS_BY_CITY_TS;
Field | Type
---------------------------------------------
ROWTIME | BIGINT (system)
ROWKEY | VARCHAR(STRING) (system)
WINDOW_START_TS | VARCHAR(STRING)
ADDRESS_CITY | VARCHAR(STRING) (key)
RATING_COUNT | BIGINT
---------------------------------------------
For runtime statistics and query details run: DESCRIBE EXTENDED <Stream,Table>;
Look at the data
SELECT * FROM RATINGS_BY_CITY_TS LIMIT 5;
ksql> SELECT * FROM RATINGS_BY_CITY_TS LIMIT 5;
1519730460000 | Geneva : Window{start=1519730460000 end=-} | 2018-02-27 11:21:00 | Geneva | 11
1519730520000 | Geneva : Window{start=1519730520000 end=-} | 2018-02-27 11:22:00 | Geneva | 11
1519730580000 | Geneva : Window{start=1519730580000 end=-} | 2018-02-27 11:23:00 | Geneva | 14
1519730640000 | Geneva : Window{start=1519730640000 end=-} | 2018-02-27 11:24:00 | Geneva | 10
1519730700000 | Geneva : Window{start=1519730700000 end=-} | 2018-02-27 11:25:00 | Geneva | 19
LIMIT reached for the partition.
Query terminated
ksql>
Check the Kafka Topic
kafka-avro-console-consumer \
--bootstrap-server localhost:9092 \
--property schema.registry.url=http://localhost:8081 \
--topic RATINGS_BY_CITY_TS --from-beginning --max-messages 5| jq '.'
Robin@asgard02 ~> kafka-avro-console-consumer \
--bootstrap-server localhost:9092 \
--property schema.registry.url=http://localhost:8081 \
--topic RATINGS_BY_CITY_TS --from-beginning --max-messages 5| jq '.'
{
"WINDOW_START_TS": {
"string": "2018-02-27 11:21:00"
},
"ADDRESS_CITY": {
"string": "Geneva"
},
"RATING_COUNT": {
"long": 11
}
}
Show KSQL Experimental UI
SELECT STARS, MESSAGE, NAME FROM UNHAPPY_VIPS;