Skip to content

Instantly share code, notes, and snippets.

@dartov
Last active May 14, 2023 18:29
Show Gist options
  • Save dartov/5e594a3a2321d0ef0be223d77dcf0820 to your computer and use it in GitHub Desktop.
Save dartov/5e594a3a2321d0ef0be223d77dcf0820 to your computer and use it in GitHub Desktop.

Confluent quickstart practice

Machine setup

You'll need Docker, docker-compose and curl to run these demos. You might find jq and other utilities usefull to play with RESTful APIs.

  • Linux: make sure you have >= 8 GB RAM in your instance
  • macOS: configure Docker for at least 8 GB RAM (Docker -> Preferences -> Advanced -> Memory, then Restart Docker)

Get Confluent repo and pull images

git clone https://github.com/confluentinc/cp-docker-images
cd cp-docker-images/examples/cp-all-in-one
git checkout 5.3.0-post

Launch components and check the status

docker-compose up -d --build

Once Starting is done and you've got console back, check that everything is green

docker-compose ps

It should show something like this:

     Name                    Command               State                        Ports                      
-----------------------------------------------------------------------------------------------------------
broker            /etc/confluent/docker/run        Up      0.0.0.0:29092->29092/tcp, 0.0.0.0:9092->9092/tcp
connect           bash -c if [ ! -d /usr/sha ...   Up      0.0.0.0:8083->8083/tcp, 9092/tcp                
control-center    /etc/confluent/docker/run        Up      0.0.0.0:9021->9021/tcp                          
ksql-cli          /bin/sh                          Up                                                      
ksql-datagen      bash -c echo Waiting for K ...   Up                                                      
ksql-server       /etc/confluent/docker/run        Up      0.0.0.0:8088->8088/tcp                          
rest-proxy        /etc/confluent/docker/run        Up      0.0.0.0:8082->8082/tcp                          
schema-registry   /etc/confluent/docker/run        Up      0.0.0.0:8081->8081/tcp                          
zookeeper         /etc/confluent/docker/run        Up      0.0.0.0:2181->2181/tcp, 2888/tcp, 3888/tcp   

Connect to Control Center either in Chrome (works the best with this UI) or Firefox: http://localhost:9021

Create topics

Lets create users and pageviews topic:

docker-compose exec broker kafka-topics --create --zookeeper \
zookeeper:2181 --replication-factor 1 --partitions 1 --topic users

This should return:

Created topic users.
docker-compose exec broker kafka-topics --create --zookeeper \
zookeeper:2181 --replication-factor 1 --partitions 1 --topic pageviews

This should return:

Created topic pageviews.

Generate data

Lets start dummy data generator:

curl -L -O https://github.com/confluentinc/kafka-connect-datagen/raw/master/config/connector_pageviews_cos.config
curl -X POST -H "Content-Type: application/json" --data @connector_pageviews_cos.config http://localhost:8083/connectors

This should return something like this:

{"name":"datagen-pageviews","config":{"connector.class":"io.confluent.kafka.connect.datagen.DatagenConnector","key.converter":"org.apache.kafka.connect.storage.StringConverter","kafka.topic":"pageviews","quickstart":"pageviews","max.interval":"100","iterations":"10000000","tasks.max":"1","name":"datagen-pageviews"},"tasks":[],"type":"source"}%

You should be able to check connector status by running:

curl http://localhost:8083/connectors/datagen-pageviews | jq .

This should return something like this:

{
  "name": "datagen-pageviews",
  "config": {
    "connector.class": "io.confluent.kafka.connect.datagen.DatagenConnector",
    "quickstart": "pageviews",
    "tasks.max": "1",
    "name": "datagen-pageviews",
    "kafka.topic": "pageviews",
    "max.interval": "100",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "iterations": "10000000"
  },
  "tasks": [
    {
      "connector": "datagen-pageviews",
      "task": 0
    }
  ],
  "type": "source"
}

Let's do the same for users topic:

curl -L -O https://github.com/confluentinc/kafka-connect-datagen/raw/master/config/connector_users_cos.config
curl -X POST -H "Content-Type: application/json" --data @connector_users_cos.config http://localhost:8083/connectors
curl http://localhost:8083/connectors/datagen-users | jq .

This should give us something like this:

{
  "name": "datagen-users",
  "config": {
    "connector.class": "io.confluent.kafka.connect.datagen.DatagenConnector",
    "quickstart": "users",
    "tasks.max": "1",
    "name": "datagen-users",
    "kafka.topic": "users",
    "max.interval": "1000",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "iterations": "10000000"
  },
  "tasks": [
    {
      "connector": "datagen-users",
      "task": 0
    }
  ],
  "type": "source"
}

Run KSQL examples

First, we need to launch ksql CLI:

docker-compose exec ksql-cli ksql http://ksql-server:8088

It will show welcome screen like this:



                  
                  ===========================================
                  =        _  __ _____  ____  _             =
                  =       | |/ // ____|/ __ \| |            =
                  =       | ' /| (___ | |  | | |            =
                  =       |  <  \___ \| |  | | |            =
                  =       | . \ ____) | |__| | |____        =
                  =       |_|\_\_____/ \___\_\______|       =
                  =                                         =
                  =  Streaming SQL Engine for Apache Kafka® =
                  ===========================================

Copyright 2017-2018 Confluent Inc.

CLI v5.3.0, Server v5.3.0 located at http://ksql-server:8088

Having trouble? Type 'help' (case-insensitive) for a rundown of how things work!

ksql> 

Look around:

help
server
version
show streams;
show tables;

Let's create a stream and a table around data that we're generating:

CREATE STREAM pageviews (viewtime BIGINT, userid VARCHAR, pageid VARCHAR) \
WITH (KAFKA_TOPIC='pageviews', VALUE_FORMAT='AVRO');

CREATE TABLE users (registertime BIGINT, gender VARCHAR, regionid VARCHAR,  \
userid VARCHAR) \
WITH (KAFKA_TOPIC='users', VALUE_FORMAT='AVRO', KEY = 'userid');

Let's play with it:

SET 'auto.offset.reset'='earliest';

CREATE STREAM pageviews_female AS SELECT users.userid AS userid, pageid, \
regionid, gender FROM pageviews LEFT JOIN users ON pageviews.userid = users.userid \
WHERE gender = 'FEMALE';

CREATE STREAM pageviews_female_like_89 WITH (kafka_topic='pageviews_enriched_r8_r9', \
value_format='AVRO') AS SELECT * FROM pageviews_female WHERE regionid LIKE '%_8' OR regionid LIKE '%_9';

CREATE TABLE pageviews_regions AS SELECT gender, regionid , \
COUNT(*) AS numusers FROM pageviews_female WINDOW TUMBLING (size 30 second) \
GROUP BY gender, regionid HAVING COUNT(*) > 1;

DESCRIBE EXTENDED pageviews;

EXPLAIN pageviews_regions;

Cleanup

Stop containers:

docker-compose stop

Remove containers to cleanup your data (answer Yes):

docker-compose rm
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment