Step by step guide for multi node Confluent Kafka Platform and Cassandra cluster;
It is a multi node deployment of https://github.com/ferhtaydn/sack
Assume that, we have five Ubuntu 14.04 nodes. Their IPs are as follows;
- 12.0.5.4
- 12.0.5.5
- 12.0.5.6
- 12.0.1.170
- 12.0.1.171
All the installation and configuration settings is shown below for the node 12.0.5.4 but you need to make for all nodes.
Many problems arise from the host settings and make sure that your /etc/hosts
file is like below;
127.0.0.1 localhost 12.0.5.4
127.0.0.1 localhost
127.0.0.1 ip-12-0-5-4 #(ip-12-0-5-4 is hostname of your node)
::1 ip6-localhost ip6-loopback
fe00::0 ip6-localnet
ff00::0 ip6-mcastprefix
ff02::1 ip6-allnodes
ff02::2 ip6-allrouters
ff02::3 ip6-allhosts
Make sure that you set owner of all the directories below with your user if you have one before you run a nohup
command.
/var/lib/kafka*
/var/lib/zookeeper
/var/log/kafka/
/usr/bin/kafka*
/usr/bin/zookeeper*
https://www.digitalocean.com/community/tutorials/how-to-install-java-with-apt-get-on-ubuntu-16-04
java -version
sudo apt-get update
sudo apt-get install default-jre
sudo apt-get install default-jdk
sudo add-apt-repository ppa:webupd8team/java
sudo apt-get update
sudo apt-get install oracle-java8-installer
sudo update-alternatives --config java
sudo vi /etc/environment // add JAVA_HOME="/usr/lib/jvm/java-8-oracle"
source /etc/environment
echo "deb http://www.apache.org/dist/cassandra/debian 36x main" | sudo tee -a /etc/apt/sources.list.d/cassandra.sources.list
curl https://www.apache.org/dist/cassandra/KEYS | sudo apt-key add -
sudo apt-get update
sudo apt-get install cassandra
sudo service cassandra stop
sudo rm -rf /var/lib/cassandra/data/system/*
sudo vi /etc/cassandra/cassandra.yaml
Change the below properties in cassanda.yaml:
seeds: "12.0.5.4,12.0.5.5,12.0.5.6,12.0.1.170,12.0.1.171"
listen_address: 12.0.5.4
rpc_address: 12.0.5.4
endpoint_snitch: GossipingPropertyFileSnitch
auto_bootstrap: false
sudo service cassandra start
sudo nodetool status
cqlsh 12.0.5.4 9042
wget -qO - http://packages.confluent.io/deb/3.0/archive.key | sudo apt-key add -
sudo add-apt-repository "deb [arch=amd64] http://packages.confluent.io/deb/3.0 stable main"
sudo apt-get update && sudo apt-get install confluent-platform-2.11
Here is your final /etc/kafka/zookeeper.properties
file
dataDir=/var/lib/zookeeper
clientPort=2181
tickTime=2000
initLimit=10
syncLimit=5
maxClientCnxns=60
autopurge.snapRetainCount=3
autopurge.purgeInterval=1
server.1=0.0.0.0:2888:3888 # you are setting 0.0.0.0 here, since we are in node 12.0.5.4
server.2=12.0.5.5:2888:3888
server.3=12.0.5.6:2888:3888
server.4=12.0.1.170:2888:3888
server.5=12.0.1.171:2888:3888
sudo echo “1” > /var/lib/zookeeper/myid # do it in node 12.0.5.4
sudo echo “2” > /var/lib/zookeeper/myid # do it in node 12.0.5.5 etc.
sudo echo “3” > /var/lib/zookeeper/myid
sudo echo “4” > /var/lib/zookeeper/myid
sudo echo “5” > /var/lib/zookeeper/myid
Run your zookeeper in each node to create cluster:
nohup /usr/bin/zookeeper-server-start /etc/kafka/zookeeper.properties > /dev/null 2>&1 &
You can check if your zookeeper is up and its mode with echo stat | nc localhost 2181 | grep Mode
command.
You need to change the lines in your /etc/kafka/server.properties
file;
broker.id=0
port=9092
host.name=12.0.5.4
default.replication.factor=3
delete.topic.enable=true
log.dirs=/var/lib/kafka
zookeeper.connect=12.0.5.4:2181,12.0.5.5:2181,12.0.5.6:2181,12.0.1.170:2181,12.0.1.171:2181
broker.id
can be incremented for each other nodes. (node 12.0.5.5
has broker.id
1 e.g.)
You can run your kafka server in each node;
nohup /usr/bin/kafka-server-start /etc/kafka/server.properties > /dev/null 2>&1 &
You need to change the lines in your /etc/schema-registry/schema-registry.properties
file;
listeners=http://12.0.5.4:8081
kafkastore.connection.url=12.0.5.4:2181,12.0.5.5:2181,12.0.5.6:2181,12.0.1.170:2181,12.0.1.171:2181
kafkastore.topic=_schemas
debug=false
And can run Schema Registry like;
nohup /usr/bin/schema-registry-start /etc/schema-registry/schema-registry.properties > /dev/null 2>&1 &
You need to change the lines in your /etc/kafka/connect-distributed.properties
file, if you are using Avro for serialization of items in the topics.
bootstrap.servers=12.0.5.4:9092,12.0.5.5:9092,12.0.5.6:9092,12.0.1.170:9092,12.0.1.171:9092
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://12.0.5.4:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://12.0.5.4:8081
You can again run Kafka Connect like nohup /usr/bin/connect-distributed /etc/kafka/connect-distributed.properties > /dev/null 2>&1 &
but we will use Cassandra Sink and will run Kafka Connect in its directory with its jar added to the classpath.
You can install Cassandra Sink connector like;
##Build the connectors
git clone https://github.com/datamountaineer/stream-reactor
cd stream-reactor
gradle fatJar -x test
##Build the CLI for interacting with Kafka connectors
git clone https://github.com/datamountaineer/kafka-connect-tools
cd kafka-connect-tools
gradle fatJar -x test
cd stream-reactor/kafka-connect-cassandra/build/libs
export CLASSPATH=kafka-connect-cassandra-0.2.2-3.0.1-all.jar
nohup /usr/bin/connect-distributed /etc/kafka/connect-distributed.properties > /dev/null 2>&1 &
An example Cassandra Sink properties file /etc/kafka/connect-cassandra-sink.properties
is like;
name=cassandra-sink-products
connector.class=com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraSinkConnector
tasks.max=5
topics=product-csv-avro,product-http-avro
connect.cassandra.export.route.query=INSERT INTO products SELECT * FROM product-csv-avro;INSERT INTO products SELECT * FROM product-http-avro
connect.cassandra.contact.points=12.0.5.4
connect.cassandra.port=9042
connect.cassandra.key.space=demo
connect.cassandra.username=cassandra
connect.cassandra.password=cassandra
A file source connector properties /etc/kafka/connect-file-source.properties
can be like that;
$ cat /etc/kafka/connect-file-source.properties
name=product-csv-source
connector.class=FileStreamSource
tasks.max=5
file=~/products/products.csv
topic=product-csv-raw
cd kafka-connect-tools/build/libs
java -jar kafka-connect-cli-0.7-all.jar ps
java -jar kafka-connect-cli-0.7-all.jar create cassandra-sink-products < /etc/kafka/connect-cassandra-sink.properties
java -jar kafka-connect-cli-0.7-all.jar ps
NOT: If you add a new connector to the connect, you need to do it on leader connector, if not, it returns following error:
Error: the Kafka Connect API returned: Cannot complete request because of a conflicting operation (e.g. worker rebalance) (409)
. However, other cluster members successfully response to the GET /connectors
requests.
/usr/bin/kafka-avro-console-consumer --zookeeper 12.0.5.4:2181 --property schema.registry.url=http://12.0.5.4:8081 --topic your-topic-name --from-beginning
akka {
loglevel = "INFO"
jvm-exit-on-fatal-error = on
}
http {
host = 0.0.0.0
port = 8080
}
kafka {
producer {
bootstrap.servers = "12.0.5.4:9092,12.0.5.5:9092,12.0.5.6:9092,12.0.1.170:9092,12.0.1.171:9092"
acks = "all"
//key.serializer = "io.confluent.kafka.serializers.KafkaAvroSerializer"
//value.serializer = "io.confluent.kafka.serializers.KafkaAvroSerializer"
schema.registry.url = "http://12.0.1.171:8081"
zookeeper.connect = "12.0.1.171:2181"
}
consumer {
bootstrap.servers = "12.0.5.4:9092,12.0.5.5:9092,12.0.5.6:9092,12.0.1.170:9092,12.0.1.171:9092"
schema.registry.url = "http://12.0.1.171:8081"
zookeeper.connect = "12.0.1.171:2181"
enable.auto.commit = false
auto.offset.reset = "earliest"
max.partition.fetch.bytes = "1048576"
schedule.interval = 1000
unconfirmed.timeout = 3000
}
}