Skip to content

Instantly share code, notes, and snippets.

@KostyaEsmukov
Last active June 14, 2019 23:54
Show Gist options
  • Save KostyaEsmukov/53658e3e95d08b61254224a03571ee98 to your computer and use it in GitHub Desktop.
Save KostyaEsmukov/53658e3e95d08b61254224a03571ee98 to your computer and use it in GitHub Desktop.

Lab: Kafka and Network Partitions

  • Questions:
    • Does a Kafka cluster of 3 nodes survive a rolling restart of the nodes without losing a single message?
  • Requirements: Vagrant, VirtualBox

Setup

  • Put all of the attached files to some dir and switch to it
  • $ vagrant up
  • Wait until vagrant completes the provisioning

Create a Kafka topic:

vagrant ssh node1 -- /home/kafka/kafka/bin/kafka-topics.sh \
    --create --zookeeper 127.0.0.1:2181 \
    --if-not-exists \
    --replication-factor 3 --partitions 2 \
    --topic mytopic

Experiment

We would need 2 terminal tabs:

  1. vagrant ssh control -- python3 -u /vagrant/consumer.py
  2. vagrant ssh control -- python3 -u /vagrant/producer.py

Run these commands to ensure that Kafka works.

Producer would output the following status line (which will be updating as the messages are consumed from the topic):

missing=[] multiple=[] expected_value=320 value=320: : 321it [00:10, 68.44it/s]

Values meaning:

  • missing lists the messages which weren't received;
  • multiple lists messages which were received multiple times;
  • expected_value is what the producer expects to receive at the current iteration;
  • value is what actually has been received from Kafka.

Now let's kill one node. Open VirtualBox, find node1, power it off.

Both producer and consumer hung a little, and then resumed. Here's the log:

consumer:

missing=[] multiple=[] expected_value=14186 value=14186: : 14186it [03:12, 78.67it/s]Failed fetch messages from 1: [Error 7] RequestTimedOutError
missing=[] multiple=[] expected_value=14189 value=14189: : 14190it [03:30, 78.67it/s]Unable connect to node with id 1:
Unable connect to node with id 1:
Failed fetch messages from 1: NodeNotReadyError: Attempt to send a request to node which is not ready (node id 1).
missing=[] multiple=[] expected_value=19352 value=19352: : 19353it [04:58, 77.57it/s]

producer:

14183it [03:07, 78.44it/s]Unable to send 14188
Got error produce response: [Error 7] RequestTimedOutError
Traceback (most recent call last):
  File "/vagrant/producer.py", line 26, in send_one
    await producer.send_and_wait("mytopic", f"{value}".encode())
  File "/usr/local/lib/python3.7/dist-packages/aiokafka/producer/producer.py", line 452, in send_and_wait
    return (yield from future)
kafka.errors.RequestTimedOutError: [Error 7] RequestTimedOutError
14190it [03:20, 78.44it/s]Unable connect to node with id 1:
Unable to send 14190
Unable connect to node with id 1:
Got error produce response: NodeNotReadyError: Attempt to send a request to node which is not ready (node id 1).
Traceback (most recent call last):
  File "/vagrant/producer.py", line 26, in send_one
    await producer.send_and_wait("mytopic", f"{value}".encode())
  File "/usr/local/lib/python3.7/dist-packages/aiokafka/producer/producer.py", line 452, in send_and_wait
    return (yield from future)
kafka.errors.NodeNotReadyError: NodeNotReadyError: Attempt to send a request to node which is not ready (node id 1).
Unable to send 14190
Unable connect to node with id 1:
Got error produce response: NodeNotReadyError: Attempt to send a request to node which is not ready (node id 1).
Traceback (most recent call last):
  File "/vagrant/producer.py", line 26, in send_one
    await producer.send_and_wait("mytopic", f"{value}".encode())
  File "/usr/local/lib/python3.7/dist-packages/aiokafka/producer/producer.py", line 452, in send_and_wait
    return (yield from future)
kafka.errors.NodeNotReadyError: NodeNotReadyError: Attempt to send a request to node which is not ready (node id 1).
20425it [05:06, 78.67it/s]

Now start the node1 and wait until it boots. And then power off node2.

On the producer/consumer side it was the same: both hung a little and then resumed:

consumer:

missing=[] multiple=[] expected_value=27192 value=27192: : 27192it [06:38, 77.53it/s]Failed fetch messages from 2: [Error 7] RequestTimedOutError
missing=[] multiple=[] expected_value=27195 value=27195: : 27195it [06:51,  1.54it/s]Unable connect to node with id 2:
Unable connect to node with id 2:
Failed fetch messages from 2: NodeNotReadyError: Attempt to send a request to node which is not ready (node id 2).
missing=[] multiple=[] expected_value=31146 value=31146: : 31146it [07:59, 78.75it/s]

producer:

27190it [06:33, 77.12it/s]Got error produce response: [Error 7] RequestTimedOutError
Unable to send 27195
Traceback (most recent call last):
  File "/vagrant/producer.py", line 26, in send_one
    await producer.send_and_wait("mytopic", f"{value}".encode())
  File "/usr/local/lib/python3.7/dist-packages/aiokafka/producer/producer.py", line 452, in send_and_wait
    return (yield from future)
kafka.errors.RequestTimedOutError: [Error 7] RequestTimedOutError
27196it [06:50, 77.12it/s]Unable connect to node with id 2:
Unable connect to node with id 2:
Got error produce response: NodeNotReadyError: Attempt to send a request to node which is not ready (node id 2).
Unable to send 27196
Traceback (most recent call last):
  File "/vagrant/producer.py", line 26, in send_one
    await producer.send_and_wait("mytopic", f"{value}".encode())
  File "/usr/local/lib/python3.7/dist-packages/aiokafka/producer/producer.py", line 452, in send_and_wait
    return (yield from future)
kafka.errors.NodeNotReadyError: NodeNotReadyError: Attempt to send a request to node which is not ready (node id 2).
32098it [08:05, 79.08it/s]

Start node2, wait until it boots. Power off node3.

No changes in behavior here: both producer and consumer resumed after a little pause:

consumer:

missing=[] multiple=[] expected_value=39562 value=39562: : 39562it [09:46, 78.05it/s]Failed fetch messages from 3: [Error 7] RequestTimedOutError
missing=[] multiple=[] expected_value=39563 value=39563: : 39564it [10:00, 78.05it/s]Unable connect to node with id 3:
Failed fetch messages from 3: NodeNotReadyError: Attempt to send a request to node which is not ready (node id 3).
Unable connect to node with id 3:
Failed fetch messages from 3: NodeNotReadyError: Attempt to send a request to node which is not ready (node id 3).
missing=[] multiple=[] expected_value=46863 value=46863: : 46863it [11:48, 79.57it/s]

producer:

39556it [09:40, 78.61it/s]Unable to send 39563
Got error produce response: [Error 7] RequestTimedOutError
Traceback (most recent call last):
  File "/vagrant/producer.py", line 26, in send_one
    await producer.send_and_wait("mytopic", f"{value}".encode())
  File "/usr/local/lib/python3.7/dist-packages/aiokafka/producer/producer.py", line 452, in send_and_wait
    return (yield from future)
kafka.errors.RequestTimedOutError: [Error 7] RequestTimedOutError
39564it [09:50,  2.58it/s]Unable connect to node with id 3:
Unable to send 39564
Unable connect to node with id 3:
Got error produce response: NodeNotReadyError: Attempt to send a request to node which is not ready (node id 3).
Traceback (most recent call last):
  File "/vagrant/producer.py", line 26, in send_one
    await producer.send_and_wait("mytopic", f"{value}".encode())
  File "/usr/local/lib/python3.7/dist-packages/aiokafka/producer/producer.py", line 452, in send_and_wait
    return (yield from future)
kafka.errors.NodeNotReadyError: NodeNotReadyError: Attempt to send a request to node which is not ready (node id 3).
48221it [12:00, 78.44it/s]

So... Producer and Consumer survived the rolling Kafka cluster restart without any losses or duplications in the messages. Yay?...

Now let's fuck up the cluster: kill 2 nodes out of 3.

First, start node3, wait until it boots. Then power off node1 and node2.

As expected, both consumer and producer have stalled (because 1 Zookeeper/Kafka node is not enough for a quorum).

consumer:

missing=[] multiple=[] expected_value=65736 value=65736: : 65736it [15:49, 77.31it/s]Failed fetch messages from 1: [Error 7] RequestTimedOutError
missing=[] multiple=[] expected_value=65736 value=65736: : 65737it [16:00, 77.31it/s]Unable connect to node with id 1:
Failed fetch messages from 1: NodeNotReadyError: Attempt to send a request to node which is not ready (node id 1).
Unable connect to node with id 1:
Failed fetch messages from 1: NodeNotReadyError: Attempt to send a request to node which is not ready (node id 1).
Unable connect to node with id 1:
Failed fetch messages from 1: NodeNotReadyError: Attempt to send a request to node which is not ready (node id 1).
Unable connect to node with id 1:
Failed fetch messages from 1: NodeNotReadyError: Attempt to send a request to node which is not ready (node id 1).
Unable connect to node with id 1: [Errno 113] Connect call failed ('192.168.99.11', 9092)
Failed fetch messages from 1: NodeNotReadyError: Attempt to send a request to node which is not ready (node id 1).
Unable connect to node with id 1: [Errno 113] Connect call failed ('192.168.99.11', 9092)
Failed fetch messages from 1: NodeNotReadyError: Attempt to send a request to node which is not ready (node id 1).
Unable connect to node with id 1: [Errno 113] Connect call failed ('192.168.99.11', 9092)
Failed fetch messages from 1: NodeNotReadyError: Attempt to send a request to node which is not ready (node id 1).
Unable connect to node with id 1: [Errno 113] Connect call failed ('192.168.99.11', 9092)
Failed fetch messages from 1: NodeNotReadyError: Attempt to send a request to node which is not ready (node id 1).
Unable connect to node with id 1: [Errno 113] Connect call failed ('192.168.99.11', 9092)
Failed fetch messages from 1: NodeNotReadyError: Attempt to send a request to node which is not ready (node id 1).
Unable connect to node with id 1: [Errno 113] Connect call failed ('192.168.99.11', 9092)

producer:

65730it [15:43, 77.48it/s]Got error produce response: [Error 7] RequestTimedOutError
Unable to send 65737
Traceback (most recent call last):
  File "/vagrant/producer.py", line 26, in send_one
    await producer.send_and_wait("mytopic", f"{value}".encode())
  File "/usr/local/lib/python3.7/dist-packages/aiokafka/producer/producer.py", line 452, in send_and_wait
    return (yield from future)
kafka.errors.RequestTimedOutError: [Error 7] RequestTimedOutError
65737it [16:00, 77.48it/s]Unable connect to node with id 1:
Unable connect to node with id 1:
Got error produce response: NodeNotReadyError: Attempt to send a request to node which is not ready (node id 1).
Unable to send 65737
Traceback (most recent call last):
  File "/vagrant/producer.py", line 26, in send_one
    await producer.send_and_wait("mytopic", f"{value}".encode())
  File "/usr/local/lib/python3.7/dist-packages/aiokafka/producer/producer.py", line 452, in send_and_wait
    return (yield from future)
kafka.errors.NodeNotReadyError: NodeNotReadyError: Attempt to send a request to node which is not ready (node id 1).
Unable connect to node with id 1:
Got error produce response: NodeNotReadyError: Attempt to send a request to node which is not ready (node id 1).
Unable to send 65737
...

Unable to send 65738
Traceback (most recent call last):
  File "/vagrant/producer.py", line 26, in send_one
    await producer.send_and_wait("mytopic", f"{value}".encode())
  File "/usr/local/lib/python3.7/dist-packages/aiokafka/producer/producer.py", line 452, in send_and_wait
    return (yield from future)
kafka.errors.RequestTimedOutError: [Error 7] RequestTimedOutError
Traceback (most recent call last):
  File "/vagrant/producer.py", line 40, in <module>
    loop.run_until_complete(send_one())
  File "/usr/lib/python3.7/asyncio/base_events.py", line 584, in run_until_complete
    return future.result()
  File "/vagrant/producer.py", line 26, in send_one
    await producer.send_and_wait("mytopic", f"{value}".encode())
  File "/usr/local/lib/python3.7/dist-packages/aiokafka/producer/producer.py", line 452, in send_and_wait
    return (yield from future)
kafka.errors.RequestTimedOutError: [Error 7] RequestTimedOutError

Producer process have stopped due to an uncaught exception.

Boot node1 and node2.

Until node1 and node2 have booted, consumer were flooding the log with the following:

Unable connect to node with id 1: [Errno 111] Connect call failed ('192.168.99.11', 9092)
Failed fetch messages from 1: NodeNotReadyError: Attempt to send a request to node which is not ready (node id 1).

But when they have booted, it just stalled.

So, apparently, my producer and consumer processes are unable to survive the whole Kafka cluster outage.

Let's restart them to ensure that the cluster has recovered:

  1. vagrant ssh control -- python3 -u /vagrant/consumer.py
  2. vagrant ssh control -- python3 -u /vagrant/producer.py

And yes it did!

consumer:

missing=[] multiple=[] expected_value=3970 value=3970: : 3971it [00:58, 74.01it/s]

producer:

4820it [01:03, 78.20it/s]

Conclusion

The provisioned Kafka/Zookeeper cluster is indeed able to survive a rolling restart without losing any messages (at least I didn't observe any losses).

After a Kafka node is powered off there were small delays (5-20 seconds) on both consumer and producer sides.

When a majority of Kafka nodes is shut down, Kafka stalls until a quorum is achieved.

import asyncio
from collections import Counter
from tqdm import tqdm
from aiokafka import AIOKafkaConsumer
loop = asyncio.get_event_loop()
async def consume():
consumer = AIOKafkaConsumer(
"mytopic",
loop=loop,
request_timeout_ms=10_000,
bootstrap_servers="192.168.99.11:9092,192.168.99.12:9092,192.168.99.13:9092",
)
await consumer.start()
try:
p = tqdm()
missing = set()
missing_position = -1
multiple = Counter()
expected_value = 0
print("Ready")
async for msg in consumer:
value = int(msg.value)
next_missing_position = max(missing_position, max(expected_value, value))
for i in range(missing_position + 1, next_missing_position + 1):
missing.add(i)
missing_position = next_missing_position
if value not in missing:
multiple.update({value: 1})
missing.discard(value)
p.set_description(
f"missing={sorted(missing)} "
f"multiple={sorted(multiple.items())} "
f"expected_value={expected_value} value={value}"
)
p.update(n=1)
expected_value += 1
finally:
await consumer.stop()
loop.run_until_complete(consume())
[Unit]
Requires=zookeeper.service
After=zookeeper.service
[Service]
Type=simple
User=kafka
ExecStart=/bin/sh -c 'exec /home/kafka/kafka/bin/kafka-server-start.sh /home/kafka/kafka/config/server.properties > /home/kafka/kafka/logs/kafka.log 2>&1'
ExecStop=/home/kafka/kafka/bin/kafka-server-stop.sh
Restart=always
RestartSec=10
[Install]
WantedBy=multi-user.target
import asyncio
import traceback
from itertools import count
from tqdm import tqdm
from aiokafka import AIOKafkaProducer
loop = asyncio.get_event_loop()
async def send_one():
producer = AIOKafkaProducer(
loop=loop,
request_timeout_ms=10_000,
bootstrap_servers="192.168.99.11:9092,192.168.99.12:9092,192.168.99.13:9092",
)
await producer.start()
it = iter(count(0))
p = tqdm()
try:
while True:
value = next(it)
for i in range(5):
try:
await producer.send_and_wait("mytopic", f"{value}".encode())
except Exception:
print(f"Unable to send {value}")
traceback.print_exc()
if i == 4:
raise
else:
break
p.update(n=1)
await asyncio.sleep(0.01)
finally:
await producer.stop()
loop.run_until_complete(send_one())
#!/bin/sh
set -eux
apt-get update
apt-get install -yq --no-install-recommends \
htop bmon curl \
default-jre
# https://www.digitalocean.com/community/tutorials/how-to-install-apache-kafka-on-debian-9
useradd kafka -m
# https://kafka.apache.org/downloads
curl 'https://www.apache.org/dist/kafka/2.2.1/kafka_2.12-2.2.1.tgz' \
-o /home/kafka/kafka.tgz
cd /home/kafka
tar xaf kafka.tgz
mv kafka_* kafka
NODE_ID=`hostname | xargs echo -n | tail -c 1`
LOCAL_IP=`ip addr show eth1 | grep "inet\b" | awk '{print $2}' | cut -d/ -f1`
sed -i "s/^broker\\.id=.*$/broker.id=${NODE_ID}/" kafka/config/server.properties
sed -i 's/^zookeeper.connect=.*$/zookeeper.connect=192.168.99.11:2181,192.168.99.12:2181,192.168.99.13:2181/' kafka/config/server.properties
chown -R root:root kafka
mkdir -p kafka/logs/
chown -R kafka:kafka kafka/logs/
cat >> kafka/config/server.properties <<EOF
listeners=PLAINTEXT://:9092
advertised.host.name=${LOCAL_IP}
EOF
cat > kafka/config/zookeeper.properties <<EOF
dataDir=/data/zookeeper
clientPort=2181
initLimit=5
syncLimit=2
tickTime=2000
maxClientCnxns=60
server.1=192.168.99.11:2888:3888
server.2=192.168.99.12:2888:3888
server.3=192.168.99.13:2888:3888
EOF
mkdir -p /data/zookeeper
echo "$NODE_ID" > /data/zookeeper/myid
chown -R kafka:kafka /data/zookeeper
cp /vagrant/zookeeper.service /etc/systemd/system/zookeeper.service
cp /vagrant/kafka.service /etc/systemd/system/kafka.service
systemctl daemon-reload
systemctl enable zookeeper
systemctl enable kafka
systemctl start zookeeper
systemctl start kafka
Vagrant.configure("2") do |config|
config.vm.box = "debian/buster64"
config.vm.provider "virtualbox" do |vb|
vb.memory = "1024"
end
config.vm.define "node1" do |m|
m.vm.hostname = "node1"
m.vm.network "private_network", ip: "192.168.99.11"
m.vm.provision "shell", path: "provision_kafka.sh"
end
config.vm.define "node2" do |m|
m.vm.hostname = "node2"
m.vm.network "private_network", ip: "192.168.99.12"
m.vm.provision "shell", path: "provision_kafka.sh"
end
config.vm.define "node3" do |m|
m.vm.hostname = "node3"
m.vm.network "private_network", ip: "192.168.99.13"
m.vm.provision "shell", path: "provision_kafka.sh"
end
config.vm.define "control" do |m|
m.vm.hostname = "control"
m.vm.network "private_network", ip: "192.168.99.20"
m.vm.provision "shell", inline: <<-SHELL
apt-get update
apt-get install -yq python3 python3-pip
pip3 install tqdm aiokafka
SHELL
end
end
[Unit]
Requires=network.target remote-fs.target
After=network.target remote-fs.target
[Service]
Type=simple
User=kafka
ExecStart=/home/kafka/kafka/bin/zookeeper-server-start.sh /home/kafka/kafka/config/zookeeper.properties
ExecStop=/home/kafka/kafka/bin/zookeeper-server-stop.sh
Restart=always
RestartSec=10
[Install]
WantedBy=multi-user.target
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment