Skip to content

Instantly share code, notes, and snippets.

@MythRen

MythRen/README.md

Last active Jul 2, 2020
Embed
What would you like to do?
zookeeper / kafka cluster 搭建及 kafka 集群加密认证, 客户端加密认证设置方法

zookeeper 与 kafka 集群设置教程, 作为教程, 下文描述的步骤是在 1台 服务器上部署 启动三个 zookeeper 和 kafka 服务以组成模拟集群, 多机上的注意事项会在相关位置说明

⚠️: 该教程完成于内网 10.0.81.9 这台 sensor 机器, 因此以下步骤中提到的 IP 10.0.81.9, 请在自己实验中更换为自己实验用机器的 IP 地址.
⚠️: 标记 FIXME 的均为在实际环境中需要注意的值(比如 ip, 端口, 文件路径), 其余值新增内容均为固定值

1. 创建必要的测试目录

mkdir -p /data/kafka-cluster-test/{kafka,zookeeper}-data-{1,2,3}

2. 下载 kafka binary 并解压

# 这里仍使用项目中使用的版本: kafka_2.11-0.10.0.0
cd /data/kafka-cluster-test && wget https://archive.apache.org/dist/kafka/0.10.0.0/kafka_2.11-0.10.0.0.tgz
tar -xf kafka_2.11-0.10.0.0.tgz && mv kafka_2.11-0.10.0.0 kafka

3. 生成配置 kafka broker / client 认证/加密 所需的证书

# 这里将证书保存在了 /data/kafka-cluster-test/ssl/ 路径下
mkdir -p /data/kafka-cluster-test/ssl/
cd /data/kafka-cluster-test/ssl/
# 该步骤生成 ca 根证书 (用于给其他证书签名做可信证书)
./tests/gen-ssl-certs.sh ca ca-cert Kafka.Cluster
# 该步骤生成 jks 格式的证书文件供 kafka broker 使用
./tests/gen-ssl-certs.sh -k server ca-cert kafka. Kafka.Cluster
# 该步骤生成 pem 格式的证书文件供 python 客户端 及 bro kafka plugin 使用 (librdkafka)
./tests/gen-ssl-certs.sh client ca-cert kafka. Kafka.Cluster
# 该步骤生成 jks 格式的证书文件供 logstash 的 kafka plugin 使用 (java client)
./tests/gen-ssl-certs.sh -k client ca-cert kafka. Kafka.Cluster

4. 配置 zookeeper

⚠️: kafka 的 binary 包中附带了用于启动 zookeeper 的 shell 脚本及 zookeeper 的配置文件, 因为我们是在一台机器上模拟 zookeeper 集群, 因此这里我们将 zookeeper 配置文件复制 3 份并分别修改, 如果是多机部署则直接修改该文件即可.

4.1 拷贝 3 份 zookeeper 的配置文件并准备分别修改
cd /data/kafka-cluster-test/kafka/config
cp zookeeper.properties zookeeper-1.properties
cp zookeeper.properties zookeeper-2.properties
cp zookeeper.properties zookeeper-3.properties
4.2 修改 zookeeper-1.properties 配置文件
  • 参照下列配置修改或添加配置项
tickTime=2000
# FIXME: zookeeper 数据存放地址
dataDir=/data/kafka-cluster-test/zookeeper-data-1
# FIXME: zookeeper 对外提供服务端口, 如果一台机器只运行一个 zookeeper 服务, 可以保留为默认值 2181
clientPort=22181
initLimit=5
syncLimit=2
# FIXME: zookeeper 集群的全部实例地址, server.x 中的 x 必须与 `dataDir`/myid 文件中的值相同, 两个端口在单节点单实例情况下推荐均使用相同端口 2881 和 3881
# 地址后的两个端口, 第一个是用来 follower 节点连接leader节点, 第二个端口用于 leader 选举
# see https://zookeeper.apache.org/doc/r3.3.3/zookeeperAdmin.html#sc_clusterOptions
server.1=10.0.81.9:22881:23881
server.2=10.0.81.9:22882:23882
server.3=10.0.81.9:22883:23883
# FIXME: 确保该文件内的值与 zookeeper 配置文件中的 server.x 中的 x 一致
echo 1 >> /data/kafka-cluster-test/zookeeper-data-1/myid
4.3 修改 zookeeper-2.properties 配置文件
  • 参照下列配置修改或添加配置项
tickTime=2000
# FIXME: zookeeper 数据存放地址
dataDir=/data/kafka-cluster-test/zookeeper-data-2
# FIXME: zookeeper 对外提供服务端口, 如果一台机器只运行一个 zookeeper 服务, 可以保留为默认值 2181
clientPort=22182
initLimit=5
syncLimit=2
# FIXME: zookeeper 集群的全部实例地址, server.x 中的 x 必须与 `dataDir`/myid 文件中的值相同, 两个端口在单节点单实例情况下推荐均使用相同端口 2881 和 3881
# 地址后的两个端口, 第一个是用来 follower 节点连接leader节点, 第二个端口用于 leader 选举
# see https://zookeeper.apache.org/doc/r3.3.3/zookeeperAdmin.html#sc_clusterOptions
server.1=10.0.81.9:22881:23881
server.2=10.0.81.9:22882:23882
server.3=10.0.81.9:22883:23883
# FIXME: 确保该文件内的值与 zookeeper 配置文件中的 server.x 中的 x 一致
echo 2 >> /data/kafka-cluster-test/zookeeper-data-2/myid
4.4 修改 zookeeper-3.properties 配置文件
  • 参照下列配置修改或添加配置项
tickTime=2000
# FIXME: zookeeper 数据存放地址
dataDir=/data/kafka-cluster-test/zookeeper-data-3
# FIXME: zookeeper 对外提供服务端口, 如果一台机器只运行一个 zookeeper 服务, 可以保留为默认值 2181
clientPort=22183 
initLimit=5
syncLimit=2
# FIXME: zookeeper 集群的全部实例地址, server.x 中的 x 必须与 `dataDir`/myid 文件中的值相同, 两个端口在单节点单实例情况下推荐均使用相同端口 2881 和 3881
# 地址后的两个端口, 第一个是用来 follower 节点连接leader节点, 第二个端口用于 leader 选举
# see https://zookeeper.apache.org/doc/r3.3.3/zookeeperAdmin.html#sc_clusterOptions
server.1=10.0.81.9:22881:23881
server.2=10.0.81.9:22882:23882
server.3=10.0.81.9:22883:23883
# FIXME: 确保该文件内的值与 zookeeper 配置文件中的 server.x 中的 x 一致
echo 3 >> /data/kafka-cluster-test/zookeeper-data-3/myid

5. 配置 kafka

⚠️: kafka 的 binary 包中附带了用于启动 kafka 的 shell 脚本及 kafka 的配置文件, 因为我们是在一台机器上模拟 kafka 集群, 因此这里我们将 kafka 配置文件复制 3 份并分别修改, 如果是多机部署则直接修改该文件即可.

5.1 拷贝 3 份 kafka 的配置文件并准备分别修改
cd /data/kafka-cluster-test/kafka/config
cp server.properties server-1.properties
cp server.properties server-2.properties
cp server.properties server-3.properties
5.2 修改 server-1.properties 配置文件
  • 参照下列配置修改或添加配置项
# FIXME: broker ID, 每个 broker 必须不能相同
broker.id=1
# FIXME: 因为这里是单机部署多个 kafka broker, 因此我们使用自定义端口, 单机单 broker 推荐保留使用默认端口 9091
listeners=SSL://10.0.81.9:29091
advertised.listeners=SSL://10.0.81.9:29091
# FIXME: kafka 数据存放地址
log.dirs=/data/kafka-cluster-test/kafka-data-1
# FIXME: zookeeper 集群链接串
zookeeper.connect=10.0.81.9:22181,10.0.81.9:22182,10.0.81.9:22183
# 以下是 认证/加密相关配置
ssl.protocol = TLS
# FIXME: 注意修改该 jks 文件路径
ssl.keystore.location = /data/kafka-cluster-test/ssl/kafka.server.keystore.jks
ssl.keystore.password = Kafka.Cluster@2018
ssl.keystore.type = JKS
ssl.key.password = Kafka.Cluster@2018
# FIXME: 注意修改该 jks 文件路径
ssl.truststore.location = /data/kafka-cluster-test/ssl/kafka.server.truststore.jks
ssl.truststore.password = Kafka.Cluster@2018
ssl.truststore.type = JKS
ssl.client.auth = required
ssl.enabled.protocols = TLSv1.2,TLSv1.1,TLSv1
security.inter.broker.protocol = SSL
5.3 修改 server-2.properties 配置文件
  • 参照下列配置修改或添加配置项
# FIXME: broker ID, 每个 broker 必须不能相同
broker.id=2
# FIXME: 因为这里是单机部署多个 kafka broker, 因此我们使用自定义端口, 单机单 broker 推荐保留使用默认端口 9091
listeners=SSL://10.0.81.9:29092
advertised.listeners=SSL://10.0.81.9:29092
# FIXME: kafka 数据存放地址
log.dirs=/data/kafka-cluster-test/kafka-data-2
# FIXME: zookeeper 集群链接串
zookeeper.connect=10.0.81.9:22181,10.0.81.9:22182,10.0.81.9:22183
# 以下是 认证/加密相关配置
ssl.protocol = TLS
# FIXME: 注意修改该 jks 文件路径
ssl.keystore.location = /data/kafka-cluster-test/ssl/kafka.server.keystore.jks
ssl.keystore.password = Kafka.Cluster@2018
ssl.keystore.type = JKS
ssl.key.password = Kafka.Cluster@2018
# FIXME: 注意修改该 jks 文件路径
ssl.truststore.location = /data/kafka-cluster-test/ssl/kafka.server.truststore.jks
ssl.truststore.password = Kafka.Cluster@2018
ssl.truststore.type = JKS
ssl.client.auth = required
ssl.enabled.protocols = TLSv1.2,TLSv1.1,TLSv1
security.inter.broker.protocol = SSL
5.4 修改 server-3.properties 配置文件
  • 参照下列配置修改或添加配置项
# FIXME: broker ID, 每个 broker 必须不能相同
broker.id=3
# FIXME: 因为这里是单机部署多个 kafka broker, 因此我们使用自定义端口, 单机单 broker 推荐保留使用默认端口 9091
listeners=SSL://10.0.81.9:29093
advertised.listeners=SSL://10.0.81.9:29093
# FIXME: kafka 数据存放地址
log.dirs=/data/kafka-cluster-test/kafka-data-3
# FIXME: zookeeper 集群链接串
zookeeper.connect=10.0.81.9:22181,10.0.81.9:22182,10.0.81.9:22183
# 以下是 认证/加密相关配置
ssl.protocol = TLS
# FIXME: 注意修改该 jks 文件路径
ssl.keystore.location = /data/kafka-cluster-test/ssl/kafka.server.keystore.jks
ssl.keystore.password = Kafka.Cluster@2018
ssl.keystore.type = JKS
ssl.key.password = Kafka.Cluster@2018
# FIXME: 注意修改该 jks 文件路径
ssl.truststore.location = /data/kafka-cluster-test/ssl/kafka.server.truststore.jks
ssl.truststore.password = Kafka.Cluster@2018
ssl.truststore.type = JKS
ssl.client.auth = required
ssl.enabled.protocols = TLSv1.2,TLSv1.1,TLSv1
security.inter.broker.protocol = SSL

5. 配置防火墙

如果开启了防火墙, 请将以下端口加入到白名单

# FIXME: 如果单机单实例, 每个实例使用相同端口的话, 可能就只需要配置4个:  2181, 2881, 3881, 9091
22181
22182
22183

22881
22882
22883

23881
23882
23883

29091
29092
29093

6. 安装并配置 supervisor 用于启动 zookeeper / kafka 集群

6.1 将如下配置信息写入到 /data/kafka-cluster-test/kafka-cluster-supervisord.conf
[unix_http_server]
file=/data/kafka-cluster-test/supervisor.sock   ; (the path to the socket file)

[supervisord]
logfile=/data/kafka-cluster-test/supervisord.log ; (main log file;default $CWD/supervisord.log)
logfile_maxbytes=50MB        ; (max main logfile bytes b4 rotation;default 50MB)
logfile_backups=10           ; (num of main logfile rotation backups;default 10)
loglevel=info                ; (log level;default info; others: debug,warn,trace)
pidfile=/data/kafka-cluster-test/supervisord.pid ; (supervisord pidfile;default supervisord.pid)
nodaemon=false               ; (start in foreground if true;default false)
minfds=1024                  ; (min. avail startup file descriptors;default 1024)
minprocs=200                 ; (min. avail process descriptors;default 200)

[rpcinterface:supervisor]
supervisor.rpcinterface_factory = supervisor.rpcinterface:make_main_rpcinterface

[supervisorctl]
serverurl=unix:///data/kafka-cluster-test/supervisor.sock ; use a unix:// URL  for a unix socket

[program:zookeeper-1]
command=/data/kafka-cluster-test/kafka/bin/zookeeper-server-start.sh /data/kafka-cluster-test/kafka/config/zookeeper-1.properties
user=root
stdout_logfile=/data/kafka-cluster-test/super-zookeeper-1.log
redirect_stderr=true
autostart=true
autorestart=true
startsecs=10
killasgroup=true
stopasgroup=true
priority=900

[program:zookeeper-2]
command=/data/kafka-cluster-test/kafka/bin/zookeeper-server-start.sh /data/kafka-cluster-test/kafka/config/zookeeper-2.properties
user=root
stdout_logfile=/data/kafka-cluster-test/super-zookeeper-2.log
redirect_stderr=true
autostart=true
autorestart=true
startsecs=10
killasgroup=true
stopasgroup=true
priority=900

[program:zookeeper-3]
command=/data/kafka-cluster-test/kafka/bin/zookeeper-server-start.sh /data/kafka-cluster-test/kafka/config/zookeeper-3.properties
user=root
stdout_logfile=/data/kafka-cluster-test/super-zookeeper-3.log
redirect_stderr=true
autostart=true
autorestart=true
startsecs=10
killasgroup=true
stopasgroup=true
priority=900

[program:kafka-1]
command=/data/kafka-cluster-test/kafka/bin/kafka-server-start.sh /data/kafka-cluster-test/kafka/config/server-1.properties
user=root
stdout_logfile=/data/kafka-cluster-test/super-kafka-1.log
redirect_stderr=true
autostart=true
autorestart=true
startsecs=10
killasgroup=true
stopasgroup=true
priority=910

[program:kafka-2]
command=/data/kafka-cluster-test/kafka/bin/kafka-server-start.sh /data/kafka-cluster-test/kafka/config/server-2.properties
user=root
stdout_logfile=/data/kafka-cluster-test/super-kafka-2.log
redirect_stderr=true
autostart=true
autorestart=true
startsecs=10
killasgroup=true
stopasgroup=true
priority=910

[program:kafka-3]
command=/data/kafka-cluster-test/kafka/bin/kafka-server-start.sh /data/kafka-cluster-test/kafka/config/server-3.properties
user=root
stdout_logfile=/data/kafka-cluster-test/super-kafka-3.log
redirect_stderr=true
autostart=true
autorestart=true
startsecs=10
killasgroup=true
stopasgroup=true
priority=910

[group:zookeeper]
programs=zookeeper-1,zookeeper-2,zookeeper-3
priority=900


[group:kafka]
programs=kafka-1,kafka-2,kafka-3
priority=910
6.2 启动 supervisor

supervisord -c /data/kafka-cluster-test/kafka-cluster-supervisord.conf

6.3 管理启动的集群进程
  • 使用 supervisorctl -c /data/kafka-cluster-test/kafka-cluster-supervisord.conf 进行查看管理

7 相关客户端配置改造

7.1 python producer
Producer({
    'client.id': 'kafka-cluster-test',
    "enable.auto.commit": True,
    # 以下为需要新增或者调整的参数
    # FIXME: 指定所有的 kafka broker 实例地址
    "bootstrap.servers": '10.0.81.9:29091,10.0.81.9:29092,10.0.81.9:29093',
    "security.protocol": "SSL",
    # FIXME 注意文件存放路径
    "ssl.ca.location": "ssl/ca-cert",
    # FIXME 注意文件存放路径
    "ssl.certificate.location": "ssl/kafka.client.pem",
    # FIXME 注意文件存放路径
    "ssl.key.location": "ssl/kafka.client.key",
    # FIXME 第 2 步中使用 sh 脚本中的 PASS 值
    "ssl.key.password": "Kafka.Cluster@2018"
})
7.2 python consumer
Consumer({
    'group.id': 'kafka-cluster-test',
    'client.id': 'kafka-cluster-test',
    "enable.auto.commit": True,
    # 以下为需要新增或者调整的参数
    # FIXME: 指定所有的 kafka broker 实例地址
    "bootstrap.servers": '10.0.81.9:29091,10.0.81.9:29092,10.0.81.9:29093',
    "security.protocol": "SSL",
    # FIXME 注意文件存放路径
    "ssl.ca.location": "ssl/ca-cert",
    # FIXME 注意文件存放路径
    "ssl.certificate.location": "ssl/kafka.client.pem",
    # FIXME 注意文件存放路径
    "ssl.key.location": "ssl/kafka.client.key",
    # FIXME 第 2 步中使用 sh 脚本中的 PASS 值
    "ssl.key.password": "Kafka.Cluster@2018"
})
7.3 bro kafka plugin
redef Kafka::kafka_conf = table(
    ["metadata.broker.list"] = "10.0.81.9:29091,10.0.81.9:29092,10.0.81.9:29093",
    ["client.id"] = "smaug.raw",
    # 以下为需要新增或者调整的参数
    # FIXME: 指定所有的 kafka broker 实例地址
    ["security.protocol"] = "SSL",
    # FIXME 注意文件存放路径
    ["ssl.ca.location"] = "/data/kafka-cluster-test/ssl/kafka-client-ssl/ca-cert",
    # FIXME 注意文件存放路径
    ["ssl.certificate.location"] = "/data/kafka-cluster-test/ssl/kafka-client-ssl/kafka.client.pem",
    # FIXME 注意文件存放路径
    ["ssl.key.location"] = "/data/kafka-cluster-test/ssl/kafka-client-ssl/kafka.client.key",
    # FIXME 第 2 步中使用 sh 脚本中的 PASS 值
    ["ssl.key.password"] = "Kafka.Cluster@2018"
);
7.4 logstash kafka output plugin
kafka {
  codec => plain {
    format => "%{message}"
  }
  topic_id => "mytopic"
  # 以下为需要新增或者调整的参数
  # FIXME: 指定所有的 kafka broker 实例地址
  bootstrap_servers => "10.0.81.9:29091,10.0.81.9:29092,10.0.81.9:29093"
  client_id => "smaug.raw"
  security_protocol => "SSL"
  # FIXME 第 2 步中使用 sh 脚本中的 PASS 值
  ssl_key_password => "Kafka.Cluster@2018"
  # FIXME 注意文件存放路径
  ssl_keystore_location => "/data/kafka-cluster-test/ssl/kafka.client.keystore.jks"
  # FIXME 第 2 步中使用 sh 脚本中的 PASS 值
  ssl_keystore_password => "Kafka.Cluster@2018"
  ssl_keystore_type => "JKS"
  # FIXME 注意文件存放路径
  ssl_truststore_location => "/data/kafka-cluster-test/ssl/kafka.client.truststore.jks"
  # FIXME 第 2 步中使用 sh 脚本中的 PASS 值
  ssl_truststore_password => "Kafka.Cluster@2018"
  ssl_truststore_type => "JKS"
}
#!/bin/bash
#
#
# This scripts generates:
# - root CA certificate
# - server certificate and keystore
# - client keys
#
# https://cwiki.apache.org/confluence/display/KAFKA/Deploying+SSL+for+Kafka
#
if [[ "$1" == "-k" ]]; then
USE_KEYTOOL=1
shift
else
USE_KEYTOOL=0
fi
OP="$1"
CA_CERT="$2"
PFX="$3"
HOST="$4"
C=NN
ST=NN
L=NN
O=NN
OU=NN
CN="$HOST"
# Password
PASS="Kafka.Cluster@2018"
# Cert validity, in days
VALIDITY=10000
set -e
export LC_ALL=C
if [[ $OP == "ca" && ! -z "$CA_CERT" && ! -z "$3" ]]; then
CN="$3"
openssl req -new -x509 -keyout ${CA_CERT}.key -out $CA_CERT -days $VALIDITY -passin "pass:$PASS" -passout "pass:$PASS" <<EOF
${C}
${ST}
${L}
${O}
${OU}
${CN}
$USER@${CN}
.
.
EOF
elif [[ $OP == "server" && ! -z "$CA_CERT" && ! -z "$PFX" && ! -z "$CN" ]]; then
#Step 1
echo "############ Generating key"
keytool -storepass "$PASS" -keypass "$PASS" -keystore ${PFX}server.keystore.jks -alias localhost -validity $VALIDITY -genkey <<EOF
$CN
$OU
$O
$L
$ST
$C
yes
yes
EOF
#Step 2
echo "############ Adding CA"
keytool -storepass "$PASS" -keypass "$PASS" -keystore ${PFX}server.truststore.jks -alias CARoot -import -file $CA_CERT <<EOF
yes
EOF
#Step 3
echo "############ Export certificate"
keytool -storepass "$PASS" -keypass "$PASS" -keystore ${PFX}server.keystore.jks -alias localhost -certreq -file ${PFX}cert-file
echo "############ Sign certificate"
openssl x509 -req -CA $CA_CERT -CAkey ${CA_CERT}.key -in ${PFX}cert-file -out ${PFX}cert-signed -days $VALIDITY -CAcreateserial -passin "pass:$PASS"
echo "############ Import CA"
keytool -storepass "$PASS" -keypass "$PASS" -keystore ${PFX}server.keystore.jks -alias CARoot -import -file $CA_CERT <<EOF
yes
EOF
echo "############ Import signed CA"
keytool -storepass "$PASS" -keypass "$PASS" -keystore ${PFX}server.keystore.jks -alias localhost -import -file ${PFX}cert-signed
elif [[ $OP == "client" && ! -z "$CA_CERT" && ! -z "$PFX" && ! -z "$CN" ]]; then
if [[ $USE_KEYTOOL == 1 ]]; then
echo "############ Creating client truststore"
[[ -f ${PFX}client.truststore.jks ]] || keytool -storepass "$PASS" -keypass "$PASS" -keystore ${PFX}client.truststore.jks -alias CARoot -import -file $CA_CERT <<EOF
yes
EOF
echo "############ Generating key"
keytool -storepass "$PASS" -keypass "$PASS" -keystore ${PFX}client.keystore.jks -alias localhost -validity $VALIDITY -genkey <<EOF
$CN
$OU
$O
$L
$ST
$C
yes
yes
EOF
echo "########### Export certificate"
keytool -storepass "$PASS" -keypass "$PASS" -keystore ${PFX}client.keystore.jks -alias localhost -certreq -file ${PFX}cert-file
echo "########### Sign certificate"
openssl x509 -req -CA ${CA_CERT} -CAkey ${CA_CERT}.key -in ${PFX}cert-file -out ${PFX}cert-signed -days $VALIDITY -CAcreateserial -passin pass:$PASS
echo "########### Import CA"
keytool -storepass "$PASS" -keypass "$PASS" -keystore ${PFX}client.keystore.jks -alias CARoot -import -file ${CA_CERT} <<EOF
yes
EOF
echo "########### Import signed CA"
keytool -storepass "$PASS" -keypass "$PASS" -keystore ${PFX}client.keystore.jks -alias localhost -import -file ${PFX}cert-signed
else
# Standard OpenSSL keys
echo "############ Generating key"
openssl genrsa -des3 -passout "pass:$PASS" -out ${PFX}client.key 1024 -days $VALIDITY
echo "############ Generating request"
openssl req -passin "pass:$PASS" -passout "pass:$PASS" -days $VALIDITY -key ${PFX}client.key -new -out ${PFX}client.req \
<<EOF
$C
$ST
$L
$O
$OU
$CN
.
$PASS
.
EOF
echo "########### Signing key"
openssl x509 -req -passin "pass:$PASS" -in ${PFX}client.req -CA $CA_CERT -CAkey ${CA_CERT}.key -CAserial ${CA_CERT}.srl -days $VALIDITY -out ${PFX}client.pem
fi
else
echo "Usage: $0 ca <ca-cert-file> <CN>"
echo " $0 [-k] server|client <ca-cert-file> <file_prefix> <hostname>"
echo ""
echo " -k = Use keytool/Java Keystore, else standard SSL keys"
exit 1
fi
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.