为了方便,将所有的用户名密码统一为Abc12345
最后会开放所有用户的所有权限,如果需要实现真正的权限控制,【授权】那一节请根据实际需求修改命令
使用jdk的keytool生成,所有参数的含义
keytool --help
都带有中文文档,此处不再解释
keytool -genkeypair -keystore ca.jks -storepass Abc12345 -alias ca -keypass Abc12345 -validity 365 -dname CN=ca,OU=tfswx,C=cn -ext bc:c
keytool -exportcert -keystore ca.jks -storepass Abc12345 -alias ca -rfc -file ca.cer
可以用下面这条命令查看证书:
keytool -printcert -file ca.cer
keytool -genkeypair -keystore kafka.keystore.jks -storepass Abc12345 -alias kafka -keypass Abc12345 -validity 365 -dname CN=kafka,OU=tfswx,C=cn
keytool -certreq -keystore kafka.keystore.jks -storepass Abc12345 -alias kafka -keypass Abc12345 -file kafka.csr
keytool -gencert -keystore ca.jks -storepass Abc12345 -alias ca -keypass Abc12345 -validity 365 -infile kafka.csr -outfile kafka.cer
keytool -importcert -keystore kafka.truststore.jks -storepass Abc12345 -alias ca -keypass Abc12345 -file ca.cer
keytool -importcert -keystore kafka.keystore.jks -storepass Abc12345 -alias ca -keypass Abc12345 -file ca.cer
keytool -importcert -keystore kafka.keystore.jks -storepass Abc12345 -alias kafka -keypass Abc12345 -file kafka.cer
keytool -genkeypair -keystore client.keystore.jks -storepass Abc12345 -alias client -keypass Abc12345 -validity 365 -dname CN=client,OU=tfswx,C=cn
keytool -certreq -keystore client.keystore.jks -storepass Abc12345 -alias client -keypass Abc12345 -file client.csr
keytool -gencert -keystore ca.jks -storepass Abc12345 -alias ca -keypass Abc12345 -validity 365 -infile client.csr -outfile client.cer
keytool -importcert -keystore client.truststore.jks -storepass Abc12345 -alias ca -keypass Abc12345 -file ca.cer
keytool -importcert -keystore client.keystore.jks -storepass Abc12345 -alias ca -keypass Abc12345 -file ca.cer
keytool -importcert -keystore client.keystore.jks -storepass Abc12345 -alias client -keypass Abc12345 -file client.cer
加入以下两行配置:
authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
requireClientAuthScheme=sasl
创建该文件并加入以下内容:
Server {
org.apache.zookeeper.server.auth.DigestLoginModule required
username="Abc12345"
password="Abc12345"
user_Abc12345="Abc12345";
};
其中,user_Abc12345="Abc12345"
表示创建了一个用户名为Abc12345
,密码为Abc12345
。
添加该配置:
ssl.keystore.location=/path/to/kafka.keystore.jks
ssl.keystore.password=Abc12345
ssl.key.password=Abc12345
ssl.truststore.location=/path/to/kafka.truststore.jks
ssl.truststore.password=Abc12345
ssl.client.auth=required
sasl.enabled.mechanisms=PLAIN
sasl.mechanism.inter.broker.protocol=PLAIN
security.inter.broker.protocol=PLAINTEXT
ssl.endpoint.indentification.algorithm=HTTPS
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
allow.every.if.no.acl.found=true
allow.create.topics.enable=true
listeners=PLAINTEXT://YOUR.IP:9092,SASL_SSL://YOUR.IP:9093
创建该文件并加入以下内容:
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="Abc12345"
password="Abc12345"
user_Abc12345="Abc12345";
};
Client {
org.apache.zookeeper.server.auth.DigestLoginModule required
username="Abc12345"
password="Abc12345";
};
在该文件base_dir=$(dirname $0)
下一行添加:
export KAFKA_OPTS="-Djava.security.auth.login.config=$base_dir/../config/kafka_server_jaas.conf"
在该文件base_dir=$(dirname $0)
下一行添加:
export KAFKA_OPTS="-Djava.security.auth.login.config=$base_dir/../config/zookeeper_jaas.conf -Dzookeeper.sasl.serverconfig=Server"
# 启动zookeeper
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
# 停止zookeeper
bin/zookeeper-server-stop.sh
# 启动kafka
JMX_PORT=9998 bin/kafka-server-start.sh -daemon config/server.properties
# 停止kafka
bin/kafka-acls.sh YOUR.IP:9092 --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=127.0.0.1:2181 --add --allow-principal 'User:*' --operation ALL --cluster='*'
bin/kafka-acls.sh YOUR.IP:9092 --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=127.0.0.1:2181 --add --allow-principal 'User:*' --operation ALL --topic='*'
bin/kafka-acls.sh YOUR.IP:9092 --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=127.0.0.1:2181 --add --allow-principal 'User:*' --operation ALL --group='*'
修改授权后需要重启kafka
// map是KafkaConsumer构造函数的configs参数
map.put(SslConfigs.SSL_PROTOCOL_CONFIG, "SSL");
map.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
map.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
map.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, "/path/to/client.truststore.jks");
map.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "Abc12345");
map.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, "/path/to/client.keystore.jks");
map.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, "Abc12345");
map.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, "Abc12345");
map.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"Abc12345\" password=\"Abc12345\";");
map.put("sasl.mechanism", "PLAIN");