Skip to content

Instantly share code, notes, and snippets.

@Lighfer
Last active July 25, 2022 10:12
Show Gist options
  • Save Lighfer/60795d8bc8dbf3345b5e7018b57e1170 to your computer and use it in GitHub Desktop.
Save Lighfer/60795d8bc8dbf3345b5e7018b57e1170 to your computer and use it in GitHub Desktop.

为了方便,将所有的用户名密码统一为Abc12345

最后会开放所有用户的所有权限,如果需要实现真正的权限控制,【授权】那一节请根据实际需求修改命令

生成证书

使用jdk的keytool生成,所有参数的含义keytool --help都带有中文文档,此处不再解释

先生成自签ca证书

keytool -genkeypair -keystore ca.jks -storepass Abc12345 -alias ca -keypass Abc12345 -validity 365 -dname CN=ca,OU=tfswx,C=cn -ext bc:c
keytool -exportcert -keystore ca.jks -storepass Abc12345 -alias ca -rfc -file ca.cer

可以用下面这条命令查看证书:

keytool -printcert -file ca.cer

生成服务器端证书

keytool -genkeypair -keystore kafka.keystore.jks -storepass Abc12345 -alias kafka -keypass Abc12345 -validity 365 -dname CN=kafka,OU=tfswx,C=cn
keytool -certreq -keystore kafka.keystore.jks -storepass Abc12345 -alias kafka -keypass Abc12345 -file kafka.csr
keytool -gencert -keystore ca.jks -storepass Abc12345 -alias ca -keypass Abc12345 -validity 365 -infile kafka.csr -outfile kafka.cer
keytool -importcert -keystore kafka.truststore.jks -storepass Abc12345 -alias ca -keypass Abc12345 -file ca.cer
keytool -importcert -keystore kafka.keystore.jks -storepass Abc12345 -alias ca -keypass Abc12345 -file ca.cer
keytool -importcert -keystore kafka.keystore.jks -storepass Abc12345 -alias kafka -keypass Abc12345 -file kafka.cer

生成客户端证书

keytool -genkeypair -keystore client.keystore.jks -storepass Abc12345 -alias client -keypass Abc12345 -validity 365 -dname CN=client,OU=tfswx,C=cn
keytool -certreq -keystore client.keystore.jks -storepass Abc12345 -alias client -keypass Abc12345 -file client.csr
keytool -gencert -keystore ca.jks -storepass Abc12345 -alias ca -keypass Abc12345 -validity 365 -infile client.csr -outfile client.cer
keytool -importcert -keystore client.truststore.jks -storepass Abc12345 -alias ca -keypass Abc12345 -file ca.cer
keytool -importcert -keystore client.keystore.jks -storepass Abc12345 -alias ca -keypass Abc12345 -file ca.cer
keytool -importcert -keystore client.keystore.jks -storepass Abc12345 -alias client -keypass Abc12345 -file client.cer

配置SASL+SSL

1. config/zookeeper.properties

加入以下两行配置:

authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
requireClientAuthScheme=sasl

2. config/zookeeper_jaas.conf

创建该文件并加入以下内容:

Server {
    org.apache.zookeeper.server.auth.DigestLoginModule required
    username="Abc12345"
    password="Abc12345"
    user_Abc12345="Abc12345";
};

其中,user_Abc12345="Abc12345"表示创建了一个用户名为Abc12345,密码为Abc12345

3. config/server.properties

添加该配置:

ssl.keystore.location=/path/to/kafka.keystore.jks
ssl.keystore.password=Abc12345
ssl.key.password=Abc12345
ssl.truststore.location=/path/to/kafka.truststore.jks
ssl.truststore.password=Abc12345
ssl.client.auth=required

sasl.enabled.mechanisms=PLAIN
sasl.mechanism.inter.broker.protocol=PLAIN
security.inter.broker.protocol=PLAINTEXT
ssl.endpoint.indentification.algorithm=HTTPS

authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer

allow.every.if.no.acl.found=true

allow.create.topics.enable=true

listeners=PLAINTEXT://YOUR.IP:9092,SASL_SSL://YOUR.IP:9093

4. config/kafka_server_jaas.conf

创建该文件并加入以下内容:

KafkaServer {
    org.apache.kafka.common.security.plain.PlainLoginModule required
    username="Abc12345"
    password="Abc12345"
    user_Abc12345="Abc12345";
};

Client {
    org.apache.zookeeper.server.auth.DigestLoginModule required
    username="Abc12345"
    password="Abc12345";
};

5. bin/kafka-server-start.sh

在该文件base_dir=$(dirname $0)下一行添加:

export KAFKA_OPTS="-Djava.security.auth.login.config=$base_dir/../config/kafka_server_jaas.conf"

6. bin/zookeeper-server-start.sh

在该文件base_dir=$(dirname $0)下一行添加:

export KAFKA_OPTS="-Djava.security.auth.login.config=$base_dir/../config/zookeeper_jaas.conf -Dzookeeper.sasl.serverconfig=Server"

运行

# 启动zookeeper
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
# 停止zookeeper
bin/zookeeper-server-stop.sh
# 启动kafka
JMX_PORT=9998 bin/kafka-server-start.sh -daemon config/server.properties
# 停止kafka

授权

bin/kafka-acls.sh YOUR.IP:9092 --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=127.0.0.1:2181 --add --allow-principal 'User:*' --operation ALL --cluster='*'
bin/kafka-acls.sh YOUR.IP:9092 --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=127.0.0.1:2181 --add --allow-principal 'User:*' --operation ALL --topic='*'
bin/kafka-acls.sh YOUR.IP:9092 --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=127.0.0.1:2181 --add --allow-principal 'User:*' --operation ALL --group='*'

修改授权后需要重启kafka

客户端配置

// map是KafkaConsumer构造函数的configs参数
map.put(SslConfigs.SSL_PROTOCOL_CONFIG, "SSL");
map.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
map.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
map.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, "/path/to/client.truststore.jks");
map.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG,  "Abc12345");
map.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, "/path/to/client.keystore.jks");
map.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, "Abc12345");
map.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, "Abc12345");
map.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"Abc12345\" password=\"Abc12345\";");
map.put("sasl.mechanism", "PLAIN");
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment