Let's do a simple lab showing how to use Python with Kafka. We work with create-topic, list-topic, producers and consumers from the Python with Kafka.
These files should be setup on your virtual box image. You do the work for this lab
in the directory ~/kafka-training/lab1
.
You can find the latest versions of the instructions for
Lab1 here.
If you prefer to run the examples on another OS, e.g., OSX, please refer to the Kafka course notes for instructions on how to download labs and run them on OSX.
Note: later versions will likely work, but this was example was done with 1.0.0.0. The Kafka 1.0.0.0 just came out in November 2017. The course was recently upgraded to 1.1.0.
If you are using the Virtual Box image of Linux, we unzipped the Kafka download
and put it in ~/kafka-training/
, and then renamed the Kafka install folder to
kafka
. Please do the same if you decide to install Kafka yourself.
You should be using the VirtualBox image.
For our examples we'll use Confluent Open Source .
Please follow this steps to install properly:
Install Confluent, see here the instructions.
Install Librdkafka, see here the instructions.
Install Python, I used Python 2.7.14.
Next, we are going to run ZooKeeper and then run Kafka Server/Broker. We will use Python to create Kafka topic, to list Kafka topics, to start Kafka consumer and to start Kafka producer. We will use some Kafka command line utilities, to send messages via a producer and see this messages via Python Kafka consumer.
You do the work for this lab in the directory ~/kafka-training/lab1
.
Kafka relies on ZooKeeper. To keep things simple, we will use a single ZooKeeper node.
Kafka provides a startup script for ZooKeeper called zookeeper-server-start.sh
which is located at ~/kafka-training/kafka/bin/zookeeper-server-start.sh
.
The Kafka distribution also provide a ZooKeeper config file which is setup to run single node.
To run ZooKeeper, we create this script in kafka-training
and run it.
#!/usr/bin/env bash
cd ~/kafka-training
kafka/bin/zookeeper-server-start.sh \
kafka/config/zookeeper.properties
~/kafka-training
$ ./run-zookeeper.sh
Wait about 30 seconds or so for ZooKeeper to startup.
Kafka also provides a startup script for the Kafka server
called kafka-server-start.sh
which is located at ~/kafka-training/kafka/bin/kafka-server-start.sh
.
The Kafka distribution also provides a Kafka config file which is setup to run Kafka single node,
and points to ZooKeeper running on localhost:2181
.
To run Kafka, we created the script run-kafka.sh
in kafka-training
.
Please review it and then run it in another terminal window.
#!/usr/bin/env bash
cd ~/kafka-training
kafka/bin/kafka-server-start.sh \
kafka/config/server.properties
- ACTION Run the script.
~/kafka-training
$ ./run-kafka.sh
Wait about 30 seconds or so for Kafka to startup.
Now let's create the topic that we will send records on.
You will create a Python code to create a topic called my-topic
with a replication factor
of 1 since we only have one server. We will use thirteen partitions for my-topic
,
which means we could have up to 13 Kafka consumers.
To run the code, finish creating this script in kafka-training\lab1\python
, and run it in another terminal window.
#!/usr/bin/python2.7
import os
from subprocess import call
os.chdir(os.getenv('HOME') + '/kafka-training/kafka/bin/')
# Create a topic
call(["./kafka-topics.sh", "--create", "--zookeeper", "localhost:2181", "--replication-factor",
"1", "--partitions", "13", "--topic", "my-topic-python"])
- ACTION Edit the file ~/kafka-training/lab1/python/create-topic.py so that it creates a topic called my-topic-python.
- ACTION Run
create-topic.py
from a new terminal window.
~/kafka-training/lab1/python
$ ./create-topic.py
Created topic "my-topic-python".
Notice we created a topic called my-topic-python
.
You can see which topics that Kafka is managing using list-topics.py
as follows.
Finish creating the code in ~/kafka-training/lab1/python/list-topics.py
and run it.
#!/usr/bin/python2.7
import os
from subprocess import call
os.chdir(os.getenv('HOME') + '/kafka-training/kafka/bin/')
# List existing topics
call(["./kafka-topics.sh", "--list", "--zookeeper", "localhost:2181"])
Notice that we have to specify the location of the ZooKeeper cluster node which
is running on localhost
port 2181
.
- ACTION Edit the file ~/kafka-training/lab1/python/list-topic.py so that it lists all of the topics in Kafka.
- ACTION Run
list-topic.py
from a new terminal window.
~/kafka-training/lab1/python
$ ./list-topics.py
my-topic-python
You can see the topic my-topic-python
in the list of topics.
You will create a Python code that calls the Kafka utility kafka-console-producer.sh
which is located at ~/kafka-training/kafka/bin/kafka-console-producer.sh
to send
messages to a topic on the command line.
The Kafka distribution provides a command utility to send messages from the command line. It start up a terminal window where everything you type is sent to the Kafka topic.
Create the script in ~/kafka-training/lab1/python/start-producer-console.py
and run it.
#!/usr/bin/python2.7
import os
from subprocess import call
os.chdir(os.getenv('HOME') + '/kafka-training/kafka/bin/')
# Create a topic
call(["./kafka-console-producer.sh", "--broker-list", "localhost:9092", "--topic", "my-topic-python"])
Notice that we specify the Kafka node which is running at localhost:9092
.
- ACTION Edit the file ~/kafka-training/lab1//python/start-producer-console.py so that it starts the Kafka producer.
- ACTION Run
start-producer-console.py
from a new terminal window.
~/kafka-training/lab1/python
$ ./start-producer-console.py
This is message 1 with python
This is message 2 with python
This is message 3 with python
Message 4
Message 5
In order to see these messages, we will need to run the consumer console.
You will create a Python code that calls the Kafka utility kafka-console-consumer.sh
which is located at ~/kafka-training/kafka/bin/kafka-console-consumer.sh
to receive
messages from a topic on the command line.
The Kafka distribution provides a command utility to see messages from the command line. It displays the messages in various modes.
Finish creating the script in ~/kafka-training/lab1/python/start-consumer-console.py
and run it.
#!/usr/bin/python2.7
import os
from subprocess import call
os.chdir(os.getenv('HOME') + '/kafka-training/kafka/bin/')
# Create a topic
call(["./kafka-console-consumer.sh", "--bootstrap-server", "localhost:9092", "--topic",
"my-topic-python", "--from-beginning"])
Notice that we specify the Kafka node which is running at localhost:9092
like we did before, but
we also specify to read all of the messages from my-topic
from the beginning --from-beginning
.
- ACTION Edit the file ~/kafka-training/lab1/python/start-consumer-console.py so that it starts the Kafka console consumer.
- ACTION Run
start-consumer-console.py
from a new terminal window.
~/kafka-training/lab1/python
$ ./start-consumer-console.py
Message 5
This is message 3 with Python
This is message 2 with Python
This is message 1 with Python
Message 4
Notice that the messages are not coming in order. This is because we only have one consumer so it is reading the messages from all 13 partitions. Order is only guaranteed within a partition.
To learn about Kafka see Kafka architecture, Kafka topic architecture and Kafka producer architecture.
- What is Kafka?
- Kafka Architecture
- Kafka Topic Architecture
- Kafka Consumer Architecture
- Kafka Producer Architecture
- Kafka Architecture and low level design
- Kafka and Schema Registry
- Kafka and Avro
- Kafka Ecosystem
- Kafka vs. JMS
- Kafka versus Kinesis
- Kafka Tutorial: Using Kafka from the command line
- Kafka Tutorial: Kafka Broker Failover and Consumer Failover
- Kafka Tutorial
- Kafka Tutorial: Writing a Kafka Producer example in Java
- Kafka Tutorial: Writing a Kafka Consumer example in Java
- Kafka Architecture: Log Compaction
We hope you enjoyed this article. Please provide feedback. Cloudurable provides Kafka training, Kafka consulting, Kafka support and helps setting up Kafka clusters in AWS.