Welcome to the session 8 lab 3. The work for this lab is done in ~/kafka-training/lab8.3
.
In this lab, you are going to Kafka SASL PLAIN.
Please refer to the Kafka course notes for any updates or changes to this lab.
Find the latest version of this lab here.
You should use SASL/PLAIN with SSL only as transport layer to ensure no clear text passwords are not transmitted.
The Kafka default implementation of SASL/PLAIN specifies usernames and passwords in JAAS config files.
To avoid storing passwords on disk, you could define and use your own implementation of javax.security.auth.spi.LoginModule
, or use disk encryption and Unix permissions to protect the username and passwords.
In production systems, external authentication servers may implement password authentication. Kafka brokers can be integrated to work with these servers by adding your own implementation of javax.security.sasl.SaslServer
. The default implementation included in Kafka in the package org.apache.kafka.common.security.plain
can be used as an example.
To log into ZooKeeper, you would need user admin and and a password (kafka-123).
You would configure this via JAAS file called zookeeper_jass.conf
.
which will live under /opt/kafka/config/security/zookeeper_jass.conf
.
// Zookeeper server authentication
Server {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="kafka-123"
user_admin="kafka-123";
};
Note we are using PlainLoginModule from Kafka.
ACTION EDIT resources/opt/kafka/conf/security/kafka_broker_jaas.conf and follow instructions in file
We need ZooKeeper to use org.apache.zookeeper.server.auth.SASLAuthenticationProvider
as its authProvider. This authProvider requires JaaS login via SASL config/zookeeper.properties.
dataDir=/tmp/zookeeper-secure2
clientPort=2181
maxClientCnxns=0
authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
requireClientAuthScheme=sasl
jaasLoginRenew=3600000
Note we are using SASLAuthenticationProvider from Kafka.
We need to copy JAAS config files to /opt/kafka/config/security ( cp -R resources/opt/kafka/conf/security /opt/kafka/conf/ )
.
KAFKA_OPTS used by kafka startup scripts to pass extra args to JVM.
#!/usr/bin/env bash
CONFIG=`pwd`/config
cd ~/kafka-training
export KAFKA_JAAS_FILE="/opt/kafka/conf/security/zookeeper_jaas.conf"
export KAFKA_OPTS="-Djava.security.auth.login.config=$KAFKA_JAAS_FILE"
## Run ZooKeeper
kafka/bin/zookeeper-server-start.sh \
"$CONFIG/zookeeper.properties"
We will also need a JAAS config file for the broker which will live under
/opt/kafka/conf/security/kafka_broker_jaas.conf
.
This JAAS config file will sets up users for admin for zookeeper,
and for inter-broker communication, as well as set up users for consumers and producers.
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="kafka-123"
user_admin="kafka-123"
user_stocks_consumer="consumer123"
user_stocks_producer="producer123";
};
// Zookeeper client authentication
Client {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="kafka-123";
};
ACTION - EDIT resources/opt/kafka/conf/security/kafka_broker_jaas.conf
and follow instructions in file
We will need to edit config files config/server-0.properties
, config/server-1.properties
, config/server-2.properties
.
Enabled SASL support to use PLAIN SASL.
Inter-broker communication is using SASL_SSL and config producers and consumers to use 10092, 10093, 10094 with SASL_SSL protocol.
broker.id=0
listeners=PLAINTEXT://localhost:9092,SASL_SSL://localhost:10092
sasl.mechanism.inter.broker.protocol=PLAIN
sasl.enabled.mechanisms=PLAIN
security.inter.broker.protocol=SASL_SSL
ssl.keystore.location=/opt/kafka/conf/certs/kafka.keystore
ssl.keystore.password=kafka123
ssl.key.password=kafka123
ssl.truststore.location=/opt/kafka/conf/certs/kafka.truststore
ssl.truststore.password=kafka123
ssl.client.auth=required
log.dirs=./logs/kafka-0
default.replication.factor=3
num.partitions=8
min.insync.replicas=2
auto.create.topics.enable=false
broker.rack=us-west2-a
queued.max.requests=1000
auto.leader.rebalance.enable=true
zookeeper.connect=localhost:2181
delete.topic.enable=true
compression.type=producer
message.max.bytes=65536
replica.lag.time.max.ms=5000
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
num.recovery.threads.per.data.dir=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connection.timeout.ms=6000
broker.id=1
listeners=PLAINTEXT://localhost:9093,SASL_SSL://localhost:10093
sasl.mechanism.inter.broker.protocol=PLAIN
sasl.enabled.mechanisms=PLAIN
security.inter.broker.protocol=SASL_SSL
ssl.keystore.location=/opt/kafka/conf/certs/kafka.keystore
ssl.keystore.password=kafka123
ssl.key.password=kafka123
ssl.truststore.location=/opt/kafka/conf/certs/kafka.truststore
ssl.truststore.password=kafka123
ssl.client.auth=required
log.dirs=./logs/kafka-1
min.insync.replicas=1
auto.create.topics.enable=false
zookeeper.connect=localhost:2181
num.partitions=1
delete.topic.enable=true
broker.rack=rack1
auto.leader.rebalance.enable=true
compression.type=producer
message.max.bytes=65536
replica.lag.time.max.ms=5000
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
num.recovery.threads.per.data.dir=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connection.timeout.ms=6000
broker.id=2
listeners=PLAINTEXT://localhost:9094,SASL_SSL://localhost:10094
sasl.mechanism.inter.broker.protocol=PLAIN
sasl.enabled.mechanisms=PLAIN
security.inter.broker.protocol=SASL_SSL
ssl.keystore.location=/opt/kafka/conf/certs/kafka.keystore
ssl.keystore.password=kafka123
ssl.key.password=kafka123
ssl.truststore.location=/opt/kafka/conf/certs/kafka.truststore
ssl.truststore.password=kafka123
ssl.client.auth=required
log.dirs=./logs/kafka-2
min.insync.replicas=1
auto.create.topics.enable=true
zookeeper.connect=localhost:2181
num.partitions=1
delete.topic.enable=true
broker.rack=rack2
auto.leader.rebalance.enable=true
compression.type=producer
message.max.bytes=65536
replica.lag.time.max.ms=5000
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
num.recovery.threads.per.data.dir=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connection.timeout.ms=6000
#!/usr/bin/env bash
CONFIG=`pwd`/config
cd ~/kafka-training
export KAFKA_JAAS_FILE="/opt/kafka/conf/security/kafka_broker_jaas.conf"
export KAFKA_OPTS="-Djava.security.auth.login.config=$KAFKA_JAAS_FILE"
## Run Kafka for 1st Server
kafka/bin/kafka-server-start.sh \
"$CONFIG/server-0.properties"
#!/usr/bin/env bash
CONFIG=`pwd`/config
cd ~/kafka-training
export KAFKA_JAAS_FILE="/opt/kafka/conf/security/kafka_broker_jaas.conf"
export KAFKA_OPTS="-Djava.security.auth.login.config=$KAFKA_JAAS_FILE"
## Run Kafka for 1st Server
kafka/bin/kafka-server-start.sh \
"$CONFIG/server-0.properties"
#!/usr/bin/env bash
CONFIG=`pwd`/config
cd ~/kafka-training
export KAFKA_JAAS_FILE="/opt/kafka/conf/security/kafka_broker_jaas.conf"
export KAFKA_OPTS="-Djava.security.auth.login.config=$KAFKA_JAAS_FILE"
## Run Kafka for 1st Server
kafka/bin/kafka-server-start.sh \
"$CONFIG/server-0.properties"
We will need to configure username and password in JAAS file to log into Kafka Brokers.
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="stocks_consumer"
password="consumer123";
};
ACTION - EDIT resources/opt/kafka/conf/security/kafka_consumer_stocks_jaas.conf and follow directions
package com.cloudurable.kafka.consumer;
import com.cloudurable.kafka.model.StockPrice;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;
import static java.util.concurrent.Executors.newFixedThreadPool;
public class ConsumerUtil {
public static final String BROKERS = "localhost:10092,localhost:10093";
private static Consumer<String, StockPrice> createConsumer(
final String bootstrapServers, final String clientId ) {
System.setProperty("java.security.auth.login.config",
"/opt/kafka/conf/security/kafka_consumer_stocks_jaas.conf");
final Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
bootstrapServers);
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
props.put("sasl.mechanism", "PLAIN");
props.put("ssl.keystore.location",
"/opt/kafka/conf/certs/kafka.keystore");
props.put("ssl.keystore.password", "kafka123");
props.put("ssl.truststore.location",
"/opt/kafka/conf/certs/kafka.truststore");
props.put("ssl.truststore.password", "kafka123");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId);
props.put(ConsumerConfig.GROUP_ID_CONFIG,
"StockPriceConsumer");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StockDeserializer.class.getName());
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);
return new KafkaConsumer<>(props);
}
...
}
We will need to configure username and password in JAAS file to log into Kafka Brokers.
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="stocks_producer"
password="producer123";
};
ACTION - EDIT resources/opt/kafka/conf/security/kafka_producer_stocks_jaas.conf and follow directions
~kafka-training/labs/lab8.3/src/main/java/com/cloudurable/kafka/producer/support/StockPriceProducerUtils.java
package com.cloudurable.kafka.producer.support;
import com.cloudurable.kafka.model.StockPrice;
import io.advantageous.boon.core.Lists;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class StockPriceProducerUtils {
private static Producer<String, StockPrice> createProducer() {
System.setProperty("java.security.auth.login.config",
"/opt/kafka/conf/security/kafka_producer_stocks_jaas.conf");
final Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"localhost:10092,localhost:10093");
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
props.put("sasl.mechanism", "PLAIN");
props.put("ssl.keystore.location",
"/opt/kafka/conf/certs/kafka.keystore");
props.put("ssl.keystore.password", "kafka123");
props.put("ssl.truststore.location",
"/opt/kafka/conf/certs/kafka.truststore");
props.put("ssl.truststore.password", "kafka123");
props.put(ProducerConfig.CLIENT_ID_CONFIG, "StockPriceProducerUtils");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
StockPriceSerializer.class.getName());
props.put(ProducerConfig.LINGER_MS_CONFIG, 100);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16_384 * 4);
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
return new KafkaProducer<>(props);
}
...
}
ACTION - EDIT src/main/java/com/cloudurable/kafka/producer/support/StockPriceProducerUtils.java and follow directions
ACTION - RUN ZooKeeper and three Kafka Brokers (scripts are under bin for ZooKeeper and Kafka Brokers).
You should be able to send records from the producer to the broker and read records from the consumer to the broker using SASL PLAIN auth.