Skip to content

Instantly share code, notes, and snippets.

@rmoff
Last active February 10, 2020 23:41
Show Gist options
  • Star 3 You must be signed in to star a gist
  • Fork 4 You must be signed in to fork a gist
  • Save rmoff/7efa882dfd808dbab4eb7b8e6f9eda16 to your computer and use it in GitHub Desktop.
Save rmoff/7efa882dfd808dbab4eb7b8e6f9eda16 to your computer and use it in GitHub Desktop.

Live Coding a KSQL Application

Introduction

Diagram of what’s being built with the KSQL demo

This code file accompanies the slides and recording available online at : https://www.confluent.io/online-talks/

Don’t forget to check out the #ksql channel on our Community Slack group

——Robin Moffatt @rmoff, 28 Feb 2018

Pre-demo

Built using Confluent Platform 4.0 and KSQL 0.5.

Start a screen session

screen is not obligatory, but very useful :)

screen -S KSQL

Start Confluent Platform

confluent destroy
confluent start
Robin@asgard02 ~/c/confluent-4.0.0> confluent start
Writing temporary config and data files to /var/folders/q9/2tg_lt9j6nx29rvr5r5jn_bw0000gp/T/
Starting zookeeper
zookeeper is [UP]
Starting kafka
kafka is [UP]
Starting schema-registry
schema-registry is [UP]
Starting kafka-rest
kafka-rest is [UP]
Starting connect
connect is [UP]
Robin@asgard02 ~/c/confluent-4.0.0>

Populate Users topic

kafkacat -P -b localhost:9092 -t mysql_users -K ":"

Paste the data to stdin:

1:{"uid":1,"name":"Cliff","locale":"en_US","address_city":"St Louis","elite":"P"}
2:{"uid":2,"name":"Nick","locale":"en_US","address_city":"Palo Alto","elite":"G"}
3:{"uid":3,"name":"Jeremy","locale":"en_US","address_city":"Austin","elite":"P"}
4:{"uid":4,"name":"Michael","locale":"en_US","address_city":"Geneva","elite":"G"}
5:{"uid":5,"name":"Neil","locale":"en_US","address_city":"London","elite":"G"}
6:{"uid":6,"name":"Hojjat","locale":"en_US","address_city":"Palo Alto","elite":"S"}
7:{"uid":7,"name":"Damian","locale":"en_US","address_city":"London","elite":"S"}
8:{"uid":8,"name":"Apurva","locale":"en_US","address_city":"Palo Alto","elite":"S"}
9:{"uid":9,"name":"Neha","locale":"en_US","address_city":"Palo Alto","elite":"P"}
10:{"uid":10,"name":"Mattias","locale":"DE","address_city":"Palo Alto","elite":"S"}
11:{"uid":11,"name":"Gwen","locale":"en_US","address_city":"Palo Alto","elite":"P"}
12:{"uid":12,"name":"Guozhang","locale":"en_US","address_city":"Palo Alto","elite":"S"}
13:{"uid":13,"name":"Lu","locale":"en_US","address_city":"Palo Alto","elite":"P"}
14:{"uid":14,"name":"Robin","locale":"en_US","address_city":"Ilkley","elite":"G"}
2:{"uid":2,"name":"Nick","locale":"en_US","address_city":"San Francisco","elite":"G"}
2:{"uid":2,"name":"Nick","locale":"en_US","address_city":"Palo Alto","elite":"P"}

Exit: Ctrl-D

Inspect Users topic data

kafkacat -C -K: -b localhost:9092 -f 'Key: %k\nValue: %s\n\n' -t mysql_users

kafkacat -C -K: -b localhost:9092 -f 'Key:    %k\nValue:  %s\n\n' -t mysql_users
[...]
Key:    4
Value:  {"uid":4,"name":"Michael","locale":"en_US","address_city":"Geneva","elite":"G"}

Key:    5
Value:  {"uid":5,"name":"Neil","locale":"en_US","address_city":"London","elite":"G"}
[...]

Demo

Note

Create blank window in screen

Check Confluent is running

confluent status

Robin@asgard02 ~/c/confluent-4.0.0> confluent status
Writing temporary config and data files to /var/folders/q9/2tg_lt9j6nx29rvr5r5jn_bw0000gp/T/
connect is [UP]
kafka-rest is [UP]
schema-registry is [UP]
kafka is [UP]
zookeeper is [UP]

Run datagen - ratings

~/git/ksql/bin/ksql-datagen quickstart=ratings format=avro topic=ratings maxInterval=500

Robin@asgard02 ~/c/confluent-4.0.0> ~/git/ksql/bin/ksql-datagen quickstart=ratings format=avro topic=ratings maxInterval=500
1 --> ([ 1 | 10 | 3 | 9297 | 1519302621843 | 'iOS' | 'your team here rocks!' ])
2 --> ([ 2 | 10 | 1 | 3415 | 1519302622767 | 'android' | 'Surprisingly good, maybe you are getting your mojo back at long last!' ])
3 --> ([ 3 | 17 | 2 | 5197 | 1519302623090 | 'web' | 'meh' ])
4 --> ([ 4 | 9 | 3 | 9389 | 1519302623414 | 'android' | 'is this as good as it gets? really ?' ])
Note

Name screen window ratings data

Start KSQL Server

~/git/ksql/bin/ksql-server-start ~/git/ksql/ksqlserver.properties
$ ~/git/ksql/bin/ksql-server-start ~/git/ksql/ksqlserver.properties

                  ===========================================
                  =        _  __ _____  ____  _             =
                  =       | |/ // ____|/ __ \| |            =
                  =       | ' /| (___ | |  | | |            =
                  =       |  <  \___ \| |  | | |            =
                  =       | . \ ____) | |__| | |____        =
                  =       |_|\_\_____/ \___\_\______|       =
                  =                                         =
                  =  Streaming SQL Engine for Apache Kafka® =
                  ===========================================

Copyright 2018 Confluent Inc.

Server 0.5 listening on http://localhost:8090

To access the KSQL CLI, run:
ksql-cli remote http://localhost:8090

To access the UI, point your browser at:
http://localhost:8090/index.html
Note

Name screen window KSQL Server

Launch KSQL CLI

Note

New screen window

Connect to KSQL server running locally

~/git/ksql/bin/ksql-cli remote http://localhost:8090/

Note

Name screen window KSQL CLI

Robin@asgard02 ~> ~/git/ksql/bin/ksql-cli remote http://localhost:8090/


                  ===========================================
                  =        _  __ _____  ____  _             =
                  =       | |/ // ____|/ __ \| |            =
                  =       | ' /| (___ | |  | | |            =
                  =       |  <  \___ \| |  | | |            =
                  =       | . \ ____) | |__| | |____        =
                  =       |_|\_\_____/ \___\_\______|       =
                  =                                         =
                  =  Streaming SQL Engine for Apache Kafka® =
                  ===========================================

Copyright 2018 Confluent Inc.

CLI v0.5, Server v0.5 located at http://localhost:8090/

Having trouble? Type 'help' (case-insensitive) for a rundown of how things work!

ksql>

See available Kafka topics

show topics;

ksql> show topics;

 Kafka Topic      | Registered | Partitions | Partition Replicas | Consumers | Consumer Groups
-----------------------------------------------------------------------------------------------
 _schemas         | false      | 1          | 1                  | 0         | 0
 connect-configs  | false      | 1          | 1                  | 0         | 0
 connect-offsets  | false      | 25         | 1                  | 0         | 0
 connect-statuses | false      | 5          | 1                  | 0         | 0
 ksql__commands   | true       | 1          | 1                  | 0         | 0
 ratings          | false      | 1          | 1                  | 0         | 0
 users            | false      | 1          | 1                  | 0         | 0
-----------------------------------------------------------------------------------------------

Inspect a topic contents - Ratings

Tip

Don’t need to know the format of the data. Can see column names and values.

PRINT 'ratings';

Explain TS/Key/Message concept

ksql> PRINT 'ratings';
Format:AVRO
22/02/18 12:55:04 GMT, 5312, {"rating_id": 5312, "user_id": 4, "stars": 4, "route_id": 2440, "rating_time": 1519304104965, "channel": "web", "message": "Surprisingly good, maybe you are getting your mojo back at long last!"}
22/02/18 12:55:05 GMT, 5313, {"rating_id": 5313, "user_id": 3, "stars": 4, "route_id": 6975, "rating_time": 1519304105213, "channel": "web", "message": "why is it so difficult to keep the bathrooms clean ?"}

Inspect a topic contents - Users

Don’t need to know the format of the data. Can see column names and values.

PRINT 'mysql_users' FROM BEGINNING;

Tell KSQL to process from beginning of topic

Process from beginning of topic

SET 'auto.offset.reset' = 'earliest';

ksql> SET 'auto.offset.reset' = 'earliest';
Successfully changed local property 'auto.offset.reset' from 'null' to 'earliest'

Register Ratings topic for querying

CREATE STREAM ratings WITH (KAFKA_TOPIC='ratings', VALUE_FORMAT='AVRO');

Why’s it a stream? Because it’s a continuous stream of events

ksql> CREATE STREAM ratings WITH (KAFKA_TOPIC='ratings', VALUE_FORMAT='AVRO');

 Message
---------------
 Table created
---------------

Describe ratings stream

DESCRIBE ratings;

Note :

  1. System columns for timestamp and key

  2. All the other columns have been picked up automagically - have not had to specify them

ksql> DESCRIBE ratings;

 Field       | Type
-----------------------------------------
 ROWTIME     | BIGINT           (system)
 ROWKEY      | VARCHAR(STRING)  (system)
 RATING_ID   | BIGINT
 USER_ID     | INTEGER
 STARS       | INTEGER
 ROUTE_ID    | INTEGER
 RATING_TIME | BIGINT
 CHANNEL     | VARCHAR(STRING)
 MESSAGE     | VARCHAR(STRING)
-----------------------------------------
For runtime statistics and query details run: DESCRIBE EXTENDED <Stream,Table>;

Query ratings stream

SELECT * FROM ratings;

This is a continuous query!

ksql> SELECT * FROM ratings;
1519402268942 | 1 | 1 | 13 | 1 | 3700 | 1519402267832 | ios | airport refurb looks great, will fly outta here more!
1519402269200 | 2 | 2 | 12 | 2 | 9907 | 1519402269200 | android | (expletive deleted)
1519402269694 | 3 | 3 | 2 | 1 | 5421 | 1519402269694 | android | is this as good as it gets? really ?
1519402269857 | 4 | 4 | 18 | 2 | 1462 | 1519402269856 | android | your team here rocks!

Cancel the datagen task - note that the query stops.

Restart the datagen task - query now continues to return data

Filter the ratings stream

SELECT * FROM ratings WHERE STARS <3 AND CHANNEL='iOS' LIMIT 5;

Note the use of LIMIT so that we just see a sample of the stream of data

ksql> SELECT * FROM ratings WHERE STARS <3 AND CHANNEL='iOS' LIMIT 5;
1519402272247 | 13 | 13 | 9 | 1 | 2545 | 1519402272247 | iOS | more peanuts please
1519402272750 | 14 | 14 | 0 | 2 | 4419 | 1519402272749 | iOS | airport refurb looks great, will fly outta here more!
1519402273755 | 18 | 18 | 15 | 1 | 5306 | 1519402273755 | iOS | Surprisingly good, maybe you are getting your mojo back at long last!
1519402278686 | 37 | 37 | 17 | 1 | 725 | 1519402278686 | iOS | meh
1519402279186 | 39 | 39 | 10 | 1 | 6304 | 1519402279186 | iOS | (expletive deleted)
LIMIT reached for the partition.
Query terminated
ksql>

Persist a filtered stream

Create the stream

Let’s take the poor ratings from people with iOS devices, and create a new stream from them!

CREATE STREAM POOR_RATINGS AS SELECT * FROM ratings WHERE STARS <3 AND CHANNEL='iOS';
ksql> CREATE STREAM POOR_RATINGS AS SELECT * FROM ratings WHERE STARS <3 AND CHANNEL='iOS';

 Message
----------------------------
 Stream created and running
----------------------------

Inspect the stream

DESCRIBE POOR_RATINGS;
ksql> DESCRIBE POOR_RATINGS;

 Field       | Type
-----------------------------------------
 ROWTIME     | BIGINT           (system)
 ROWKEY      | VARCHAR(STRING)  (system)
 RATING_ID   | BIGINT
 USER_ID     | INTEGER
 STARS       | INTEGER
 ROUTE_ID    | INTEGER
 RATING_TIME | BIGINT
 CHANNEL     | VARCHAR(STRING)
 MESSAGE     | VARCHAR(STRING)
-----------------------------------------
For runtime statistics and query details run: DESCRIBE EXTENDED <Stream,Table>;
ksql>

Inspect the stream further

DESCRIBE EXTENDED POOR_RATINGS;
ksql> DESCRIBE EXTENDED POOR_RATINGS;

Type                 : STREAM
Key field            :
Timestamp field      : Not set - using <ROWTIME>
Key format           : STRING
Value format         : AVRO
Kafka output topic   : POOR_RATINGS (partitions: 4, replication: 1)

 Field       | Type
-----------------------------------------
 ROWTIME     | BIGINT           (system)
 ROWKEY      | VARCHAR(STRING)  (system)
 RATING_ID   | BIGINT
 USER_ID     | INTEGER
 STARS       | INTEGER
 ROUTE_ID    | INTEGER
 RATING_TIME | BIGINT
 CHANNEL     | VARCHAR(STRING)
 MESSAGE     | VARCHAR(STRING)
-----------------------------------------

Queries that write into this STREAM
-----------------------------------
id:CSAS_POOR_RATINGS - CREATE STREAM POOR_RATINGS AS SELECT * FROM ratings WHERE STARS <3 AND CHANNEL='iOS';

For query topology and execution plan please run: EXPLAIN <QueryId>

Local runtime statistics
------------------------
messages-per-sec:      0.33   total-messages:        33     last-message: 27/02/18 11:28:58 GMT
 failed-messages:         0 failed-messages-per-sec:         0      last-failed:       n/a
(Statistics of the local KSQL server interaction with the Kafka topic POOR_RATINGS)
ksql>

Query the stream

SELECT * FROM POOR_RATINGS;
ksql> SELECT * FROM POOR_RATINGS;
1519730883770 | 1608 | 1608 | 18 | 2 | 829 | 1519730883770 | iOS | your team here rocks!
1519730881132 | 1596 | 1596 | 6 | 2 | 3185 | 1519730881132 | iOS | thank you for the most friendly, helpful experience today at your new lounge
1519730886192 | 1615 | 1615 | 19 | 1 | 9409 | 1519730886192 | iOS | more peanuts please
1519730880186 | 1591 | 1591 | 7 | 1 | 9830 | 1519730880186 | iOS | is this as good as it gets? really ?
1519730894709 | 1655 | 1655 | 7 | 2 | 6036 | 1519730894709 | iOS | meh

See the Kafka Topic

From the command line, use the standard Kafka tools to interact with the new stream — it’s just a Kafka topic!

kafka-topics --zookeeper localhost:2181 --list
Robin@asgard02 ~> kafka-topics --zookeeper localhost:2181 --list
POOR_RATINGS
__consumer_offsets
_schemas
connect-configs
connect-offsets
connect-statuses
ksql__commands
mysql_users
ratings

Inspect the Kafka topic’s data

kafka-avro-console-consumer \
--bootstrap-server localhost:9092 \
--property schema.registry.url=http://localhost:8081 \
--topic POOR_RATINGS --from-beginning | jq '.'
Robin@asgard02 ~> kafka-avro-console-consumer \
                  --bootstrap-server localhost:9092 \
                  --property schema.registry.url=http://localhost:8081 \
                  --topic POOR_RATINGS --from-beginning | jq '.'
{
  "RATING_ID": {
    "long": 1615
  },
  "USER_ID": {
    "int": 19
  },
  "STARS": {
    "int": 1
  },
  "ROUTE_ID": {
    "int": 9409
  },
  "RATING_TIME": {
    "long": 1519730886192
  },
  "CHANNEL": {
    "string": "iOS"
  },
  "MESSAGE": {
    "string": "more peanuts please"
  }
}

Joining Data in KSQL

Remember our Users data? Let’s bring that into play, and use it to enrich the inbound stream of ratings data.

Inspect Users Data

Let’s check the data first, using the very handy PRINT command:

PRINT 'mysql_users' FROM BEGINNING;

ksql> PRINT 'mysql_users' FROM BEGINNING;
Format:JSON
{"ROWTIME":1519640382207,"ROWKEY":"1","uid":1,"name":"Cliff","locale":"en_US","address_city":"St Louis","elite":"P"}
{"ROWTIME":1519640382207,"ROWKEY":"2","uid":2,"name":"Nick","locale":"en_US","address_city":"Palo Alto","elite":"G"}
{"ROWTIME":1519640382207,"ROWKEY":"3","uid":3,"name":"Jeremy","locale":"en_US","address_city":"Austin","elite":"P"}

Create Users Table

Now, create a TABLE over the Kafka topic. Why’s it a table? Because for each key (user id), we want to know its value (name, address, etc)

CREATE TABLE users (uid INT, name VARCHAR, locale VARCHAR, address_city VARCHAR, elite VARCHAR) WITH (KAFKA_TOPIC='mysql_users', VALUE_FORMAT='JSON', KEY='uid');

Note that we’ve specified the KEY here, which must match the key of the Kafka message too.

ksql> CREATE TABLE users (uid INT, name VARCHAR, locale VARCHAR, address_city VARCHAR, elite VARCHAR) WITH (KAFKA_TOPIC='mysql_users', VALUE_FORMAT='JSON', KEY='uid');

 Message
---------------
 Table created
---------------
ksql>

Stream-Table join (1)

Now let’s join our ratings data, which includes user ID, to our user information:

Basics to start with — rating message plus the user’s name.

Couple of things to note: * We’re aliasing the table and stream names to make column names unambiguous * I’m using the backspace line continuation character

SELECT R.MESSAGE, U.NAME \
FROM RATINGS R LEFT JOIN USERS U \
ON R.USER_ID = U.UID \
LIMIT 5;
ksql> SELECT R.MESSAGE, U.NAME \
> FROM RATINGS R LEFT JOIN USERS U \
> ON R.USER_ID = U.UID \
> LIMIT 5;
airport refurb looks great, will fly outta here more! | Damian
Exceeded all my expectations. Thank you ! | Neil
(expletive deleted) | Lu
why is it so difficult to keep the bathrooms clean ? | Apurva
thank you for the most friendly, helpful experience today at your new lounge | Cliff
LIMIT reached for the partition.
Query terminated
ksql>

Stream-Table join (2)

Now let’s pull the full set of data, including a reformat of the timestamp into something human readable:

Note the IS NOT NULL clause to filter out any ratings with no corresponding user data

SELECT R.RATING_ID, R.STARS, R.ROUTE_ID, TIMESTAMPTOSTRING(R.RATING_TIME, 'yyyy-MM-dd HH:mm:ss'), R.CHANNEL, \
R.MESSAGE, U.NAME, U.ADDRESS_CITY, U.ELITE \
FROM RATINGS R LEFT JOIN USERS U \
ON R.USER_ID = U.UID WHERE U.NAME IS NOT NULL;
ksql> SELECT R.RATING_ID, R.STARS, R.ROUTE_ID, TIMESTAMPTOSTRING(R.RATING_TIME, 'yyyy-MM-dd HH:mm:ss'), R.CHANNEL, \
> R.MESSAGE, U.NAME, U.ADDRESS_CITY, U.ELITE \
> FROM RATINGS R LEFT JOIN USERS U \
> ON R.USER_ID = U.UID WHERE U.NAME IS NOT NULL;
18196 | 1 | 1790 | 2018-02-26 12:42:38 | iOS-test | Surprisingly good, maybe you are getting your mojo back at long last! | Robin | Ilkley | G
18197 | 4 | 2862 | 2018-02-26 12:42:38 | iOS-test | your team here rocks! | Jeremy | Austin | P
18198 | 1 | 6954 | 2018-02-26 12:42:39 | ios | Exceeded all my expectations. Thank you ! | Nick | Palo Alto | P
18199 | 1 | 2092 | 2018-02-26 12:42:39 | ios | (expletive deleted) | Neil | London | G
18200 | 2 | 6042 | 2018-02-26 12:42:39 | ios | more peanuts please | Neil | London | G
18202 | 2 | 1133 | 2018-02-26 12:42:39 | iOS-test | thank you for the most friendly, helpful experience today at your new lounge | Neha | Palo Alto | P
18205 | 3 | 4086 | 2018-02-26 12:42:40 | ios | meh | Neil | London | G
18206 | 1 | 9217 | 2018-02-26 12:42:41 | web | thank you for the most friendly, helpful experience today at your new lounge | Nick | Palo Alto | P
18207 | 3 | 2655 | 2018-02-26 12:42:41 | android | more peanuts please | Neha | Palo Alto | P
18209 | 3 | 2086 | 2018-02-26 12:42:42 | iOS-test | meh | Lu | Palo Alto | P
18210 | 2 | 1629 | 2018-02-26 12:42:42 | web | airport refurb looks great, will fly outta here more! | Neha | Palo Alto | P
^CQuery terminated

Stream-Table join (3)

Let’s persist this as an enriched stream:

CREATE STREAM RATINGS_FULL AS \
SELECT R.RATING_ID, R.STARS, R.ROUTE_ID, TIMESTAMPTOSTRING(R.RATING_TIME, 'yyyy-MM-dd HH:mm:ss') AS RATING_TIME, \
R.CHANNEL, R.MESSAGE, U.NAME, U.ADDRESS_CITY, U.ELITE \
FROM RATINGS R LEFT JOIN USERS U \
ON R.USER_ID = U.UID WHERE U.NAME IS NOT NULL;
ksql> CREATE STREAM RATINGS_FULL AS \
> SELECT R.RATING_ID, R.STARS, R.ROUTE_ID, TIMESTAMPTOSTRING(R.RATING_TIME, 'yyyy-MM-dd HH:mm:ss') AS RATING_TIME, \
> R.CHANNEL, R.MESSAGE, U.NAME, U.ADDRESS_CITY, U.ELITE \
> FROM RATINGS R LEFT JOIN USERS U \
> ON R.USER_ID = U.UID WHERE U.NAME IS NOT NULL;

 Message
----------------------------
 Stream created and running
----------------------------
ksql>

Filtering an enriched stream

Which of our Premier customers are not happy?

SELECT * FROM RATINGS_FULL WHERE ELITE='P' AND STARS <3;

ksql> SELECT * FROM RATINGS_FULL WHERE ELITE='P' AND STARS <3;
1519649077925 | 2 | 18684 | 1 | 3895 | 2018-02-26 12:44:37 | ios | thank you for the most friendly, helpful experience today at your new lounge | Nick | Palo Alto | P
1519649078280 | 13 | 18685 | 1 | 4088 | 2018-02-26 12:44:38 | iOS | more peanuts please | Lu | Palo Alto | P
1519649079664 | 1 | 18691 | 2 | 8933 | 2018-02-26 12:44:39 | iOS-test | more peanuts please | Cliff | St Louis | P
1519649084065 | 13 | 18709 | 1 | 4366 | 2018-02-26 12:44:44 | iOS-test | airport refurb looks great, will fly outta here more! | Lu | Palo Alto | P
1519649084603 | 3 | 18712 | 2 | 744 | 2018-02-26 12:44:44 | ios | worst. flight. ever. #neveragain | Jeremy | Austin | P
^CQuery terminated

Persist the filtered & enriched stream

CREATE STREAM UNHAPPY_VIPS AS SELECT * FROM RATINGS_FULL WHERE ELITE='P' AND STARS <3;

ksql> CREATE STREAM UNHAPPY_VIPS AS SELECT * FROM RATINGS_FULL WHERE ELITE='P' AND STARS <3;

 Message
----------------------------
 Stream created and running
----------------------------
ksql>

Query the new stream

SELECT STARS, MESSAGE, NAME FROM UNHAPPY_VIPS;

ksql> SELECT STARS, MESSAGE, NAME FROM UNHAPPY_VIPS;
1 | why is it so difficult to keep the bathrooms clean ? | Nick
1 | is this as good as it gets? really ? | Jeremy
1 | thank you for the most friendly, helpful experience today at your new lounge | Jeremy
1 | your team here rocks! | Lu
1 | thank you for the most friendly, helpful experience today at your new lounge | Lu
1 | why is it so difficult to keep the bathrooms clean ? | Jeremy

View the underlying topic data

kafka-avro-console-consumer \
--bootstrap-server localhost:9092 \
--property schema.registry.url=http://localhost:8081 \
--topic UNHAPPY_VIPS --from-beginning
Robin@asgard02 ~> kafka-avro-console-consumer \
                  --bootstrap-server localhost:9092 \
                  --property schema.registry.url=http://localhost:8081 \
                  --topic UNHAPPY_VIPS --from-beginning
{"RATING_ID":{"long":25},"STARS":{"int":1},"ROUTE_ID":{"int":1548},"KSQL_COL_3":{"string":"2018-02-27 11:21:21"},"CHANNEL":{"string":"android"},"MESSAGE":{"string":"thank you for the most friendly, helpful experience today at your new lounge"},"NAME":{"string":"Neha"},"ADDRESS_CITY":{"string":"Palo Alto"},"ELITE":{"string":"P"}}
{"RATING_ID":{"long":31},"STARS":{"int":2},"ROUTE_ID":{"int":451},"KSQL_COL_3":{"string":"2018-02-27 11:21:22"},"CHANNEL":{"string":"android"},"MESSAGE":{"string":"Exceeded all my expectations. Thank you !"},"NAME":{"string":"Cliff"},"ADDRESS_CITY":{"string":"St Louis"},"ELITE":{"string":"P"}}

Streaming Aggregates

Explain windowing

  • Tumbling (e.g. every 5 minutes : 00:00, 00:05, 00:10)

  • Hopping (e.g. every 5 minutes, advancing 1 minute: 00:00-00:05, 00:01-00:06)

  • Session (Sets a timeout for the given key, after which any new data is treated as a new session)

Running Count per Minute

Show count of ratings per City, per minute

SELECT ADDRESS_CITY, COUNT(*) AS RATING_COUNT \
FROM RATINGS_FULL WINDOW TUMBLING (SIZE 1 MINUTES) \
GROUP BY ADDRESS_CITY;
ksql> SELECT ADDRESS_CITY, COUNT(*) AS RATING_COUNT \
> FROM RATINGS_FULL WINDOW TUMBLING (SIZE 1 MINUTES) \
> GROUP BY ADDRESS_CITY;
Geneva | 11
Geneva | 11
Geneva | 14
Geneva | 10
Geneva | 19

CREATE TABLE

Let’s persist that into a TABLE. So far we’ve only worked with a STREAM. A table gives the state of a given key at a given point in time. So here, for each city, at each minute window, what’s the total count

CREATE TABLE RATINGS_BY_CITY AS \
SELECT ADDRESS_CITY, COUNT(*) AS RATING_COUNT \
FROM RATINGS_FULL WINDOW TUMBLING (SIZE 1 MINUTES) \
GROUP BY ADDRESS_CITY;
ksql> CREATE TABLE RATINGS_BY_CITY AS \
> SELECT ADDRESS_CITY, COUNT(*) AS RATING_COUNT \
> FROM RATINGS_FULL WINDOW TUMBLING (SIZE 1 MINUTES) \
> GROUP BY ADDRESS_CITY;

 Message
---------------------------
 Table created and running
---------------------------
ksql>

Examine the created table’s columns

DESCRIBE RATINGS_BY_CITY;

Note that the city is denoted as (key)

ksql> DESCRIBE RATINGS_BY_CITY;

 Field        | Type
------------------------------------------
 ROWTIME      | BIGINT           (system)
 ROWKEY       | VARCHAR(STRING)  (system)
 ADDRESS_CITY | VARCHAR(STRING)  (key)
 RATING_COUNT | BIGINT
------------------------------------------
For runtime statistics and query details run: DESCRIBE EXTENDED <Stream,Table>;
ksql>

Point out the system columns - ROWTIME and ROWKEY.

Examine the contents of the new table’s columns

SELECT ROWTIME, ROWKEY, ADDRESS_CITY, RATING_COUNT FROM RATINGS_BY_CITY LIMIT 5;

ksql> SELECT ROWTIME, ROWKEY, ADDRESS_CITY, RATING_COUNT FROM RATINGS_BY_CITY LIMIT 5;
1519730460000 | Geneva : Window{start=1519730460000 end=-} | Geneva | 11
1519730460000 | Ilkley : Window{start=1519730460000 end=-} | Ilkley | 11
1519730520000 | Geneva : Window{start=1519730520000 end=-} | Geneva | 11
1519730460000 | St Louis : Window{start=1519730460000 end=-} | St Louis | 9
1519730580000 | Geneva : Window{start=1519730580000 end=-} | Geneva | 14

Using Functions like TIMESTAMPTOSTRING

KSQL comes with a bunch of functions, both scalar and aggregate (like COUNT which we saw previously).

Let’s convert the ROWTIME epoch value to a more readable one:

SELECT TIMESTAMPTOSTRING(ROWTIME, 'yyyy-MM-dd HH:mm:ss') AS WINDOW_START_TS, ADDRESS_CITY, RATING_COUNT FROM RATINGS_BY_CITY;

ksql> SELECT TIMESTAMPTOSTRING(ROWTIME, 'yyyy-MM-dd HH:mm:ss') AS WINDOW_START_TS, ADDRESS_CITY, RATING_COUNT FROM RATINGS_BY_CITY;
2018-02-27 11:21:00 | London | 25
2018-02-27 11:21:00 | Palo Alto | 71
2018-02-27 11:22:00 | Palo Alto | 89
2018-02-27 11:22:00 | London | 24
2018-02-27 11:23:00 | London | 25

Filtering the aggregate table

This table is just a first class object in KSQL, that we can query and filter as any other:

SELECT TIMESTAMPTOSTRING(ROWTIME, 'yyyy-MM-dd HH:mm:ss') AS WINDOW_START_TS, ADDRESS_CITY, RATING_COUNT FROM RATINGS_BY_CITY WHERE ADDRESS_CITY='Ilkley';

ksql> SELECT TIMESTAMPTOSTRING(ROWTIME, 'yyyy-MM-dd HH:mm:ss') AS WINDOW_START_TS, ADDRESS_CITY, RATING_COUNT FROM RATINGS_BY_CITY WHERE ADDRESS_CITY='Ilkley';
2018-02-27 11:21:00 | Ilkley | 11
2018-02-27 11:22:00 | Ilkley | 6
2018-02-27 11:23:00 | Ilkley | 13
2018-02-27 11:24:00 | Ilkley | 14
2018-02-27 11:25:00 | Ilkley | 15

Note how aggregates update within the current window.

Check out the Kafka topic

Let’s step out of KSQL for a moment. When you create a STREAM or `TABLE in KSQL, it is backed by a Kafka topic. Let’s check this out:

kafka-topics --zookeeper localhost:2181 --list|grep RATINGS

Robin@asgard02 ~/c/confluent-4.0.0> kafka-topics --zookeeper localhost:2181 --list|grep RATINGS
RATINGS_BY_CITY
ksql_query_CTAS_RATING_COUNT_BY_CHANNEL-KSQL_Agg_Query_1519407993703-changelog
ksql_query_CTAS_RATING_COUNT_BY_CHANNEL-KSQL_Agg_Query_1519407993703-repartition
[...]

And it’s just a Kafka topic, that we can consume from just as any other:

kafka-avro-console-consumer \
--bootstrap-server localhost:9092 \
--property schema.registry.url=http://localhost:8081 \
--topic RATINGS_BY_CITY --from-beginning -max-messages 5| jq '.'
Robin@asgard02 ~> kafka-avro-console-consumer \
                                                      --bootstrap-server localhost:9092 \
                                                      --property schema.registry.url=http://localhost:8081 \
                                                      --topic RATINGS_BY_CITY2 --from-beginning -max-messages 5| jq '.'
{
  "ADDRESS_CITY": {
    "string": "London"
  },
  "RATING_COUNT": {
    "long": 25
  }
}
{
  "ADDRESS_CITY": {
    "string": "Palo Alto"
  },
  "RATING_COUNT": {
    "long": 71
  }
}

Where’s the window? Well that’s a system column that’s part of the message key. If we wanted to expose it further, we could do a CREATE TABLE based on that TIMESTAMPTOSTRING function which exposes the system column

Exposing Aggregate Window Keys

CREATE TABLE RATINGS_BY_CITY_TS AS SELECT TIMESTAMPTOSTRING(ROWTIME, 'yyyy-MM-dd HH:mm:ss') AS WINDOW_START_TS, ROWKEY, ADDRESS_CITY, RATING_COUNT FROM RATINGS_BY_CITY;

ksql> CREATE TABLE RATINGS_BY_CITY_TS AS SELECT TIMESTAMPTOSTRING(ROWTIME, 'yyyy-MM-dd HH:mm:ss') AS WINDOW_START_TS, ROWKEY, ADDRESS_CITY, RATING_COUNT FROM RATINGS_BY_CITY;

 Message
---------------------------
 Table created and running
---------------------------
ksql>

Examine table with Timestamp exposed

DESCRIBE RATINGS_BY_CITY_TS;

ksql> describe RATINGS_BY_CITY_TS;


 Field           | Type
---------------------------------------------
 ROWTIME         | BIGINT           (system)
 ROWKEY          | VARCHAR(STRING)  (system)
 WINDOW_START_TS | VARCHAR(STRING)
 ADDRESS_CITY    | VARCHAR(STRING)  (key)
 RATING_COUNT    | BIGINT
---------------------------------------------
For runtime statistics and query details run: DESCRIBE EXTENDED <Stream,Table>;

Look at the data

SELECT * FROM RATINGS_BY_CITY_TS LIMIT 5;

ksql> SELECT * FROM RATINGS_BY_CITY_TS LIMIT 5;
1519730460000 | Geneva : Window{start=1519730460000 end=-} | 2018-02-27 11:21:00 | Geneva | 11
1519730520000 | Geneva : Window{start=1519730520000 end=-} | 2018-02-27 11:22:00 | Geneva | 11
1519730580000 | Geneva : Window{start=1519730580000 end=-} | 2018-02-27 11:23:00 | Geneva | 14
1519730640000 | Geneva : Window{start=1519730640000 end=-} | 2018-02-27 11:24:00 | Geneva | 10
1519730700000 | Geneva : Window{start=1519730700000 end=-} | 2018-02-27 11:25:00 | Geneva | 19
LIMIT reached for the partition.
Query terminated
ksql>

Check the Kafka Topic

kafka-avro-console-consumer \
                                    --bootstrap-server localhost:9092 \
                                    --property schema.registry.url=http://localhost:8081 \
                                    --topic RATINGS_BY_CITY_TS --from-beginning --max-messages 5| jq '.'
Robin@asgard02 ~> kafka-avro-console-consumer \
                                    --bootstrap-server localhost:9092 \
                                    --property schema.registry.url=http://localhost:8081 \
                                    --topic RATINGS_BY_CITY_TS --from-beginning --max-messages 5| jq '.'
{
  "WINDOW_START_TS": {
    "string": "2018-02-27 11:21:00"
  },
  "ADDRESS_CITY": {
    "string": "Geneva"
  },
  "RATING_COUNT": {
    "long": 11
  }
}

Show KSQL Experimental UI

SELECT STARS, MESSAGE, NAME FROM UNHAPPY_VIPS;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment