Skip to content

Instantly share code, notes, and snippets.

@ferhtaydn
Created November 18, 2016 14:22
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 7 You must be signed in to fork a gist
  • Save ferhtaydn/1c803f28a414c75e5d5df365af11f9c7 to your computer and use it in GitHub Desktop.
Save ferhtaydn/1c803f28a414c75e5d5df365af11f9c7 to your computer and use it in GitHub Desktop.
Confluent Kafka Platform and Cassandra Multi Node Deployment Guide

Step by step guide for multi node Confluent Kafka Platform and Cassandra cluster;

It is a multi node deployment of https://github.com/ferhtaydn/sack

Assume that, we have five Ubuntu 14.04 nodes. Their IPs are as follows;

  • 12.0.5.4
  • 12.0.5.5
  • 12.0.5.6
  • 12.0.1.170
  • 12.0.1.171

All the installation and configuration settings is shown below for the node 12.0.5.4 but you need to make for all nodes.

/etc/hosts

Many problems arise from the host settings and make sure that your /etc/hosts file is like below;

127.0.0.1 localhost 12.0.5.4
127.0.0.1 localhost
127.0.0.1 ip-12-0-5-4  #(ip-12-0-5-4 is hostname of your node)

::1 ip6-localhost ip6-loopback
fe00::0 ip6-localnet
ff00::0 ip6-mcastprefix
ff02::1 ip6-allnodes
ff02::2 ip6-allrouters
ff02::3 ip6-allhosts

chown settings

Make sure that you set owner of all the directories below with your user if you have one before you run a nohup command.

  • /var/lib/kafka*
  • /var/lib/zookeeper
  • /var/log/kafka/
  • /usr/bin/kafka*
  • /usr/bin/zookeeper*

Install Java

https://www.digitalocean.com/community/tutorials/how-to-install-java-with-apt-get-on-ubuntu-16-04

java -version
sudo apt-get update
sudo apt-get install default-jre
sudo apt-get install default-jdk

sudo add-apt-repository ppa:webupd8team/java
sudo apt-get update
sudo apt-get install oracle-java8-installer

sudo update-alternatives --config java
sudo vi /etc/environment // add JAVA_HOME="/usr/lib/jvm/java-8-oracle"
source /etc/environment

Cassandra Cluster

https://www.digitalocean.com/community/tutorials/how-to-install-cassandra-and-run-a-single-node-cluster-on-ubuntu-14-04

echo "deb http://www.apache.org/dist/cassandra/debian 36x main" | sudo tee -a /etc/apt/sources.list.d/cassandra.sources.list
curl https://www.apache.org/dist/cassandra/KEYS | sudo apt-key add -
sudo apt-get update
sudo apt-get install cassandra
sudo service cassandra stop
sudo rm -rf /var/lib/cassandra/data/system/*
sudo vi /etc/cassandra/cassandra.yaml

Change the below properties in cassanda.yaml:

seeds: "12.0.5.4,12.0.5.5,12.0.5.6,12.0.1.170,12.0.1.171"
listen_address: 12.0.5.4
rpc_address: 12.0.5.4
endpoint_snitch: GossipingPropertyFileSnitch
auto_bootstrap: false
sudo service cassandra start
sudo nodetool status
cqlsh 12.0.5.4 9042

Kafka

wget -qO - http://packages.confluent.io/deb/3.0/archive.key | sudo apt-key add -
sudo add-apt-repository "deb [arch=amd64] http://packages.confluent.io/deb/3.0 stable main"
sudo apt-get update && sudo apt-get install confluent-platform-2.11

Zookeeper

Here is your final /etc/kafka/zookeeper.properties file

dataDir=/var/lib/zookeeper
clientPort=2181
tickTime=2000
initLimit=10
syncLimit=5
maxClientCnxns=60
autopurge.snapRetainCount=3
autopurge.purgeInterval=1
server.1=0.0.0.0:2888:3888 # you are setting 0.0.0.0 here, since we are in node 12.0.5.4
server.2=12.0.5.5:2888:3888
server.3=12.0.5.6:2888:3888
server.4=12.0.1.170:2888:3888
server.5=12.0.1.171:2888:3888
sudo echo “1” > /var/lib/zookeeper/myid # do it in node 12.0.5.4
sudo echo “2” > /var/lib/zookeeper/myid # do it in node 12.0.5.5 etc.
sudo echo “3” > /var/lib/zookeeper/myid
sudo echo “4” > /var/lib/zookeeper/myid
sudo echo “5” > /var/lib/zookeeper/myid

Run your zookeeper in each node to create cluster:

nohup /usr/bin/zookeeper-server-start /etc/kafka/zookeeper.properties > /dev/null 2>&1 &

You can check if your zookeeper is up and its mode with echo stat | nc localhost 2181 | grep Mode command.

Kafka Server

You need to change the lines in your /etc/kafka/server.properties file;

broker.id=0
port=9092
host.name=12.0.5.4
default.replication.factor=3
delete.topic.enable=true
log.dirs=/var/lib/kafka
zookeeper.connect=12.0.5.4:2181,12.0.5.5:2181,12.0.5.6:2181,12.0.1.170:2181,12.0.1.171:2181

broker.id can be incremented for each other nodes. (node 12.0.5.5 has broker.id 1 e.g.)

You can run your kafka server in each node;

nohup /usr/bin/kafka-server-start /etc/kafka/server.properties > /dev/null 2>&1 &

Schema Registry

You need to change the lines in your /etc/schema-registry/schema-registry.properties file;

listeners=http://12.0.5.4:8081
kafkastore.connection.url=12.0.5.4:2181,12.0.5.5:2181,12.0.5.6:2181,12.0.1.170:2181,12.0.1.171:2181
kafkastore.topic=_schemas
debug=false

And can run Schema Registry like;

nohup /usr/bin/schema-registry-start /etc/schema-registry/schema-registry.properties > /dev/null 2>&1 &

Kafka Connect

You need to change the lines in your /etc/kafka/connect-distributed.properties file, if you are using Avro for serialization of items in the topics.

bootstrap.servers=12.0.5.4:9092,12.0.5.5:9092,12.0.5.6:9092,12.0.1.170:9092,12.0.1.171:9092

key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://12.0.5.4:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://12.0.5.4:8081

You can again run Kafka Connect like nohup /usr/bin/connect-distributed /etc/kafka/connect-distributed.properties > /dev/null 2>&1 & but we will use Cassandra Sink and will run Kafka Connect in its directory with its jar added to the classpath.

Cassandra Sink Connector

You can install Cassandra Sink connector like;

##Build the connectors
git clone https://github.com/datamountaineer/stream-reactor
cd stream-reactor
gradle fatJar -x test

##Build the CLI for interacting with Kafka connectors
git clone https://github.com/datamountaineer/kafka-connect-tools
cd kafka-connect-tools
gradle fatJar -x test
cd stream-reactor/kafka-connect-cassandra/build/libs
export CLASSPATH=kafka-connect-cassandra-0.2.2-3.0.1-all.jar
nohup /usr/bin/connect-distributed /etc/kafka/connect-distributed.properties > /dev/null 2>&1 &

An example Cassandra Sink properties file /etc/kafka/connect-cassandra-sink.properties is like;

name=cassandra-sink-products
connector.class=com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraSinkConnector
tasks.max=5
topics=product-csv-avro,product-http-avro
connect.cassandra.export.route.query=INSERT INTO products SELECT * FROM product-csv-avro;INSERT INTO products SELECT * FROM product-http-avro
connect.cassandra.contact.points=12.0.5.4
connect.cassandra.port=9042
connect.cassandra.key.space=demo
connect.cassandra.username=cassandra
connect.cassandra.password=cassandra

A file source connector properties /etc/kafka/connect-file-source.properties can be like that;

$ cat /etc/kafka/connect-file-source.properties
name=product-csv-source
connector.class=FileStreamSource
tasks.max=5
file=~/products/products.csv
topic=product-csv-raw
cd kafka-connect-tools/build/libs
java -jar kafka-connect-cli-0.7-all.jar ps
java -jar kafka-connect-cli-0.7-all.jar create cassandra-sink-products < /etc/kafka/connect-cassandra-sink.properties
java -jar kafka-connect-cli-0.7-all.jar ps

NOT: If you add a new connector to the connect, you need to do it on leader connector, if not, it returns following error: Error: the Kafka Connect API returned: Cannot complete request because of a conflicting operation (e.g. worker rebalance) (409). However, other cluster members successfully response to the GET /connectors requests.

Console Consumer

/usr/bin/kafka-avro-console-consumer --zookeeper 12.0.5.4:2181 --property schema.registry.url=http://12.0.5.4:8081 --topic your-topic-name --from-beginning

Akka based application.conf

akka {

  loglevel = "INFO"

  jvm-exit-on-fatal-error = on

}

http {
  host = 0.0.0.0
  port = 8080
}

kafka {

  producer {
    bootstrap.servers = "12.0.5.4:9092,12.0.5.5:9092,12.0.5.6:9092,12.0.1.170:9092,12.0.1.171:9092"
    acks = "all"
    //key.serializer = "io.confluent.kafka.serializers.KafkaAvroSerializer"
    //value.serializer = "io.confluent.kafka.serializers.KafkaAvroSerializer"
    schema.registry.url = "http://12.0.1.171:8081"
    zookeeper.connect = "12.0.1.171:2181"
  }

  consumer {
    bootstrap.servers = "12.0.5.4:9092,12.0.5.5:9092,12.0.5.6:9092,12.0.1.170:9092,12.0.1.171:9092"
    schema.registry.url = "http://12.0.1.171:8081"
    zookeeper.connect = "12.0.1.171:2181"
    enable.auto.commit = false
    auto.offset.reset = "earliest"
    max.partition.fetch.bytes = "1048576"
    schedule.interval = 1000
    unconfirmed.timeout = 3000
  }

}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment