View udaf.java
package com.myorg.ksql.udaf;
import io.confluent.ksql.function.udaf.UdafFactory;
import io.confluent.ksql.function.udf.UdfDescription;
@UdfDescription(name = "totalStringLength", author = "Confluent", version = "1.0")
public class TotalStringLength {
@UdafFactory(description = "sums the length of strings")
public static Udaf<String, Long> createSumLengthString() {
View define-a-struct.sql
CREATE STREAM orders (orderid VARCHAR, itemid VARCHAR, address STRUCT<city VARCHAR, state VARCHAR, zicode BIGINT>)
WITH (kafka_topic = 'orders_topic', value_format = 'JSON', key = 'orderId');
View docker.sh
$ docker container run --rm \
--net streams_streams-net \
-v /docker/host/path/to/app.jar:/docker/container/path/to/app.jar \
openjdk:8-jre java -jar /docker/container/path/to/app.jar
View gist:548cd72eaec017c475448cb9b2ced258

Notes

  • Good: very quick snapshotting even for large heap spaces (e.g. works if your JVM process would otherwise timeout and die)
  • Bad: such a memory snapshot contains significantly less useful information than YourKit's "enhanced" memory snapshots.

Usage

# 1. Find the relevant JVM pid on the target machine
$ ps ...
View live-coding-ksql.adoc

Live Coding a KSQL Application

Introduction

View example.sql
SELECT user_id, page, action FROM clickstream c
LEFT JOIN users u ON c.user_id = u.user_id
WHERE u.level = 'Platinum';
View explain.txt
ksql> EXPLAIN ctas_ip_sum;
Type : QUERY
SQL : CREATE TABLE IP_SUM as SELECT ip, SUM(bytes)/1024 as kbytes FROM CLICKSTREAM WINDOW SESSION (300 SECONDS) GROUP BY ip;
Local runtime statistics
------------------------
messages-per-sec: 104.38 total-messages: 14238 last-message: 12/14/17 4:30:42 PM GMT
failed-messages: 0 last-failed: n/a
(Statistics of the local KSQL Server interaction with the Kafka topic IP_SUM)
View describe.txt
ksql> DESCRIBE EXTENDED ip_sum;
Type : TABLE
Key field : CLICKSTREAM.IP
Timestamp field : Not set - using <ROWTIME>
Key format : STRING
Value format : JSON
Kafka output topic : IP_SUM (partitions: 4, replication: 1)
Field | Type
-------------------------------------
View avro.sql
CREATE STREAM pageviews WITH (KAFKA_TOPIC='pageviews_topic', VALUE_FORMAT='AVRO');
View join.sql
CREATE STREAM pageviews (viewtime BIGINT, user_id VARCHAR, ...) WITH (KAFKA_TOPIC='pageviews-topic', VALUE_FORMAT='AVRO');
CREATE TABLE users (user_id VARCHAR, registertime BIGINT, ...) WITH (KAFKA_TOPIC='users-topic', KEY='user_id', VALUE_FORMAT='JSON');
CREATE STREAM pageviews_enriched AS
SELECT pv.viewtime, pv.userid AS userid, ... FROM pageviews pv
LEFT JOIN users ON users.user_id = pv.user_id;