Skip to content

Instantly share code, notes, and snippets.

@MichaelDrogalis
Last active September 2, 2024 20:50
Show Gist options
  • Save MichaelDrogalis/08463ed82a4a04015eef03cb483cad26 to your computer and use it in GitHub Desktop.
Save MichaelDrogalis/08463ed82a4a04015eef03cb483cad26 to your computer and use it in GitHub Desktop.

I've been working with Apache Kafka for over 7 years. I inevitably find myself doing the same set of activities while I'm developing or working with someone else's system. Here's a set of Kafka productivity hacks for doing a few things way faster than you're probably doing them now. 🔥

Get the tools

Most of these tricks rely on having KSQL, and the easiest way to get that is with Docker. KSQL let's you work with the data in Kafka in a client/server fashion. It won't attach/disrupt your existing Kafka setup. Launch this Docker Compose file with docker-compose up. Adjust KSQL_BOOTSTRAP_SERVERS to point to your Kafka installation if it's not locally available on port 9092.

---
version: '2'

services:
  ksql-server:
    image: confluentinc/cp-ksql-server:5.3.0
    hostname: ksql-server
    container_name: ksql-server
    network_mode: host
    ports:
      - "8088:8088"
    environment:
      KSQL_BOOTSTRAP_SERVERS: "127.0.0.1:9092"
      KSQL_HOST_NAME: ksql-server
      KSQL_LISTENERS: "http://0.0.0.0:8088"
      KSQL_CACHE_MAX_BYTES_BUFFERING: 0

  ksql-cli:
    image: confluentinc/cp-ksql-cli:5.3.0
    container_name: ksql-cli
    network_mode: host
    depends_on:
      - ksql-server
    entrypoint: /bin/sh
    tty: true

Now to the fun part.


Show me all my Kafka topics and their partitions, replicas, and consumers

What's in the cluster? No command kafka-topics.sh + a long string of flags needed. No UI needed. It can even approximate how much outbound activity there is on the topics by the consumer count.

ksql> show topics;

 Kafka Topic | Registered | Partitions | Partition Replicas | Consumers | ConsumerGroups 
-----------------------------------------------------------------------------------------
 auctions    | false      | 3          | 1                  | 2         | 1              
 bids        | false      | 12         | 1                  | 3         | 3              
 people      | false      | 6          | 1                  | 3         | 1              
-----------------------------------------------------------------------------------------

Show me the contents of a topic

What's in this topic? Forget kafka-console-consumer.sh. print can find out. It will also guess the serialization format of your data. This is handy if you don't know what the data in the topic is or who produced it.

ksql> print 'bids' from beginning;
Format:JSON
{"ROWTIME":1562876975855,"ROWKEY":"null","item":"glow stick","price_usd":1,"bid_time":"8:13"}
{"ROWTIME":1562876977966,"ROWKEY":"null","item":"magnet","price_usd":6,"bid_time":"8:17"}
{"ROWTIME":1562876971480,"ROWKEY":"null","item":"tweezers","price_usd":4,"bid_time":"8:05"}
{"ROWTIME":1562876969350,"ROWKEY":"null","item":"sponge","price_usd":3,"bid_time":"8:11"}
{"ROWTIME":1562876967096,"ROWKEY":"null","item":"spoon","price_usd":2,"bid_time":"8:07"}
{"ROWTIME":1562876973673,"ROWKEY":"null","item":"key chain","price_usd":5,"bid_time":"8:09"}
...

In this case, it auto-discovered that I have JSON in the topic. ROWKEY and ROWTIME are KSQL's anolog for the key and timestamp of each record.


Create a Kafka topic

No more fishing around the broker's machine for kafka-topics.sh to make a topic. Make one right at the prompt. Can specify partitions/replication factor as usual.

ksql> CREATE STREAM cheese_shipments (shipmentId INT, cheese VARCHAR, shipmentTimestamp VARCHAR)
>    WITH (kafka_topic='cheese_shipments', partitions=3, key = 'shipmentId', value_format='json');

 Message        
----------------
 Stream created 
----------------

This creates a Kafka topic and registers a stream over it, which is basically a schema for data validation. Check it out:

ksql> show streams;

 Stream Name      | Kafka Topic      | Format 
----------------------------------------------
 CHEESE_SHIPMENTS | cheese_shipments | JSON   
----------------------------------------------

It's a stream, but really it's just a topic. There's those cheese_shipments:

ksql> show topics;

 Kafka Topic      | Registered | Partitions | Partition Replicas | Consumers | ConsumerGroups 
----------------------------------------------------------------------------------------------
 auctions         | false      | 3          | 1                  | 2         | 1              
 bids             | false      | 12         | 1                  | 3         | 3              
 cheese_shipments | true       | 3          | 1                  | 0         | 0              
 people           | false      | 6          | 1                  | 3         | 1              
----------------------------------------------------------------------------------------------

If the topic is active, you can get some runtime metrics about what it's doing. I don't have any here though, so it's blank at the bottom.

ksql> describe extended cheese_shipments;

Name                 : CHEESE_SHIPMENTS
Type                 : STREAM
Key field            : SHIPMENTID
Key format           : STRING
Timestamp field      : Not set - using <ROWTIME>
Value format         : JSON
Kafka topic          : cheese_shipments (partitions: 3, replication: 1)

 Field             | Type                      
-----------------------------------------------
 ROWTIME           | BIGINT           (system) 
 ROWKEY            | VARCHAR(STRING)  (system) 
 SHIPMENTID        | INTEGER                   
 CHEESE            | VARCHAR(STRING)           
 SHIPMENTTIMESTAMP | VARCHAR(STRING)           
-----------------------------------------------

Local runtime statistics
------------------------


(Statistics of the local KSQL server interaction with the Kafka topic cheese_shipments)

Produce messages to a Kafka topic

Gotta get some data in a topic. I can either write a program which uses the producer API (ugh) or figure out how to use kafkacat (I need to relearn that every time). How about insert into instead?

ksql> insert into cheese_shipments (shipmentId, cheese, shipmentTimestamp) values (42, 'provolone', 'june 5th 2019');
ksql> insert into cheese_shipments (shipmentId, cheese, shipmentTimestamp) values (45, 'chedar', 'june 8th 2019');
ksql> insert into cheese_shipments (shipmentId, cheese, shipmentTimestamp) values (47, 'swiss', 'june 8th 2019');
ksql> insert into cheese_shipments (shipmentId, cheese, shipmentTimestamp) values (51, 'cooper', 'june 11th 2019');

It's definitely in the topic. Check it out. Note that the topic is keyed by shipment ID and there's 3 partitions, so the order seen here doesn't match how I inserted it.

ksql> print 'cheese_shipments' from beginning;
Format:JSON
{"ROWTIME":1562882124666,"ROWKEY":"45","SHIPMENTID":45,"CHEESE":"chedar","SHIPMENTTIMESTAMP":"june 8th 2019"}
{"ROWTIME":1562882127795,"ROWKEY":"45","SHIPMENTID":47,"CHEESE":"swiss","SHIPMENTTIMESTAMP":"june 8th 2019"}
{"ROWTIME":1562882078365,"ROWKEY":"42","SHIPMENTID":42,"CHEESE":"provolone","SHIPMENTTIMESTAMP":"june 5th 2019"}
{"ROWTIME":1562882163295,"ROWKEY":"51","SHIPMENTID":51,"CHEESE":"cooper","SHIPMENTTIMESTAMP":"june 11th 2019"}

Validate the schema of messages before producing to a topic

I often wish I had better client-side validation of my messages before I put them on a topic. insert into does that for free because we placed a schema over it. 🎉

ksql> insert into cheese_shipments (shipmentId, cheese, shipmentTimestamp) values ('bad shipment id', 'american', 'june 12th 2019');
Expected type INT32 for field SHIPMENTID but got bad shipment id(STRING)

Do all of this at a distance

Bonus: all of this works over KSQL's REST API. So if the command line doesn't work for your situation, you can take all of this to the programming language you're using. One less container to run, too, so you can reasonably grab the tools you need with one docker run command.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment