Skip to content

Instantly share code, notes, and snippets.

@gilmacieljr
Last active May 7, 2018 14:20
Show Gist options
  • Save gilmacieljr/1dbe21ea31903d1fa8184e08710760c0 to your computer and use it in GitHub Desktop.
Save gilmacieljr/1dbe21ea31903d1fa8184e08710760c0 to your computer and use it in GitHub Desktop.
Lab 8.2

Lab 8.2: Kafka and SASL

Welcome to the session 8 lab 2. The work for this lab is done in ~/kafka-training/lab8.2. In this lab, you are going to setup Kafka SSL support.

Please refer to the Kafka course notes for any updates or changes to this lab.

Find the latest version of this lab here. (Please input the correct link)

Authentication

Kafka Broker supports Authentication in producers and consumers, brokers, tools with methods SSL and SASL.

Kafka supports the following SASL mechanisms:

SASL/GSSAPI Kerberos (GSSAPI - Generic Security Services Application Program Interface - offers a data-security layer)
SASL/PLAIN (Simple cleartext password mechanism)
SASL/SCRAM-SHA-256 (SCRAM - Salted Challenge Response Authentication Mechanism - modern challenge-response scheme based mechanism with channel binding support)
SASL/SCRAM-SHA-512 (SCRAM - Salted Challenge Response Authentication Mechanism - modern challenge-response scheme based mechanism with channel binding support)

Kafka SASL Authentication - Brokers

Kafka uses JAAS (Java Authentication and Authorization Service) for SASL configuration.
In Kafka Broker JAAS config you have a section name KafkaServer for JAAS file, provides SASL configuration options and how SASL client connections are configured.
In Client section (-Dzookeeper.sasl.client=Client is default) use to authenticate a SASL connection with zookeeper (service name, Dzookeeper.sasl.client.username=zookeeper by default) an allows Kafka brokers to set SASL ACL on zookeeper nodes.
Locks nodes down so only brokers can modify ZooKeeper nodes.
Same principal must be used by all brokers in cluster.

Kafka SASL Authentication - Clients

Clients (Producers and Consumers) configure JAAS using client configuration property sasl.jaas.config or using the static JAAS config file
Configure a login module in KafkaClient for the selected mechanism GSSAPI (Kerberos), PLAIN or SCRAM -Djava.security.auth.login.config=/opt/kafka/conf/kafka_consumer_stocks_jaas.conf

~/kafka-training/lab8.2/solution/jaas/kafka_consumer_stocks_jaas.conf

KafkaClient {
  com.sun.security.auth.module.Krb5LoginModule required
  useKeyTab=true
  storeKey=true
  keyTab="/opt/kafka/conf/security/kafka_consumer.keytab"
  principal="kafka-consumer-stocks@cloudurable.com";
};

SASL Broker config

Kafka Broker Config : SASL configured with transport PLAINTEXT or SSL
listeners=SASL_PLAINTEXT://hostname:port listener= SASL_SSL://hostname:port security.inter.broker.protocol=SASL_PLAINTEXT or SASL_SSL

If SASL_SSL is used, then SSL has to be configured

Kafka SASL Mechanisms: GSSAPI (Kerberos), PLAIN, SCRAM-SHA-256, SCRAM-SHA-512

Kafka Authentication using SASL/Kerberos

If you use Active Directory then no need to setup Kerberos server
If not using Active Directory you will need to install it
If Oracle Java, download JCE policy files for your Java version to $JAVA_HOME/jre/lib/security

SASL Kerberos: Create Kerberos Principals for Kafka Broker

Ask your Kerberos or Active Directory admin for a principal for each Kafka broker in cluster. Ensure all hosts are reachable using hostnames, Kerberos requirement that all hosts are resolvable with FQDNs.

If running your own Kerberos server, create these principals

$ sudo /usr/sbin/kadmin.local -q 'addprinc -randkey \
	kafka/{hostname}@{REALM}'
$ sudo /usr/sbin/kadmin.local -q "ktadd -k /etc/security/keytabs/{keytabname}.keytab \
	kafka/{hostname}@{REALM}"

SASL Kerberos: Configuring Kafka Brokers for Kerberos

~/kafka-training/lab8.2/solution/jaas/kafka_broker_jaas.conf

KafkaServer {
  com.sun.security.auth.module.Krb5LoginModule required
  useKeyTab=true
  storeKey=true
  keyTab="/opt/kafka/conf/security/kafka_broker.keytab"
  principal="kafka/kafka-broker.hostname.com1@cloudurable.com";
};

// Zookeeper client authentication
Client {
  com.sun.security.auth.module.Krb5LoginModule required
  useKeyTab=true
  storeKey=true
  keyTab="/opt/kafka/conf/security/kafka_broker.keytab"
  principal="kafka/kafka-broker1.hostname.com@cloudurable.com";
};

Pass to JVM starting up broker.
-Djava.security.krb5.conf=/etc/kafka/krb5.conf
-Djava.security.auth.login.config=/var/kafka/conf/secutiry/ kafka_server_jaas.conf

SASL Kerberos: Configuring Kafka Broker Config for Kerberos

Configure SASL port and SASL mechanisms in server.properties as described.
Configure service name (sasl.kerberos.service.name).
Match principal name of the kafka brokers from JAAS config on last slide. Recall principal was kafka/kafka-broker1.hostname.com@cloudurable.com .
Set sasl.enabled.mechanisms to GSSAPI (Kerberos).
Set inter broker communication to SASL_PLAINTEXT or SASL_SSL .

~/kafka-training/lab8.2/solution/config/server-0.properties

broker.id=0

listeners=SASL_PLAINTEXT://localhost:9092,SASL_SSL://localhost:10092
sasl.mechanism.inter.broker.protocol=GSSAPI
sasl.enabled.mechanisms=GSSAPI
sasl.kerberos.service.name=kafka
security.inter.broker.protocol=SASL_PLAINTEXT

ssl.keystore.location=/opt/kafka/conf/certs/kafka.keystore
ssl.keystore.password=kafka123
ssl.key.password=kafka123
ssl.truststore.location=/opt/kafka/conf/certs/kafka.truststore
ssl.truststore.password=kafka123
ssl.client.auth=required

log.dirs=./logs/kafka-0

default.replication.factor=3
num.partitions=8
min.insync.replicas=2 
auto.create.topics.enable=false
broker.rack=us-west2-a
queued.max.requests=1000
auto.leader.rebalance.enable=true

zookeeper.connect=localhost:2181
delete.topic.enable=true
compression.type=producer
message.max.bytes=65536
replica.lag.time.max.ms=5000
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
num.recovery.threads.per.data.dir=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connection.timeout.ms=6000

ACTION - EDIT config/server-0.properties and follow instructions

~/kafka-training/lab8.2/solution/config/server-1.properties

broker.id=1

listeners=SASL_PLAINTEXT://localhost:9093,SASL_SSL://localhost:10093
sasl.mechanism.inter.broker.protocol=GSSAPI
sasl.enabled.mechanisms=GSSAPI
sasl.kerberos.service.name=kafka
security.inter.broker.protocol=SASL_PLAINTEXT

ssl.keystore.location=/opt/kafka/conf/certs/kafka.keystore
ssl.keystore.password=kafka123
ssl.key.password=kafka123
ssl.truststore.location=/opt/kafka/conf/certs/kafka.truststore
ssl.truststore.password=kafka123
ssl.client.auth=required

log.dirs=./logs/kafka-1

default.replication.factor=3
num.partitions=8
min.insync.replicas=2 
auto.create.topics.enable=false
broker.rack=us-west2-a
queued.max.requests=1000
auto.leader.rebalance.enable=true

zookeeper.connect=localhost:2181
delete.topic.enable=true
compression.type=producer
message.max.bytes=65536
replica.lag.time.max.ms=5000
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
num.recovery.threads.per.data.dir=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connection.timeout.ms=6000

ACTION - EDIT config/server-1.properties and follow instructions

~/kafka-training/lab8.2/solution/config/server-2.properties

broker.id=2

listeners=SASL_PLAINTEXT://localhost:9094,SASL_SSL://localhost:10094
sasl.mechanism.inter.broker.protocol=GSSAPI
sasl.enabled.mechanisms=GSSAPI
sasl.kerberos.service.name=kafka
security.inter.broker.protocol=SASL_PLAINTEXT

ssl.keystore.location=/opt/kafka/conf/certs/kafka.keystore
ssl.keystore.password=kafka123
ssl.key.password=kafka123
ssl.truststore.location=/opt/kafka/conf/certs/kafka.truststore
ssl.truststore.password=kafka123
ssl.client.auth=required

log.dirs=./logs/kafka-2

default.replication.factor=3
num.partitions=8
min.insync.replicas=2 
auto.create.topics.enable=false
broker.rack=us-west2-a
queued.max.requests=1000
auto.leader.rebalance.enable=true

zookeeper.connect=localhost:2181
delete.topic.enable=true
compression.type=producer
message.max.bytes=65536
replica.lag.time.max.ms=5000
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
num.recovery.threads.per.data.dir=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connection.timeout.ms=6000

ACTION - EDIT config/server-2.properties and follow instructions

SASL Kerberos: Configuring Clients for SASL Kerberos

Sets the connection protocol to SASL_SSL, encrypt with SSL, authenticate with SASL.
Sets the service name to Kafka.
Sets the sasl.mechanism to Kerberos (GSSAPI).

~/kafka-training/lab8.2/solution/src/main/java/com/cloudurable/kafka/consumer/ConsumerUtil.java

package com.cloudurable.kafka.consumer;

import com.cloudurable.kafka.model.StockPrice;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;

import static java.util.concurrent.Executors.newFixedThreadPool;

public class ConsumerUtil {

    private static Consumer<String, StockPrice> createConsumer(
            final String bootstrapServers, final String clientId ) {

        final Properties props = new Properties();

        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
                BROKERS);

        props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
        props.put("sasl.kerberos.service.name", "kafka");
        props.put("sasl.mechanism", "GSSAPI");

        props.put("ssl.truststore.location", "/opt/kafka/conf/certs/kafka.truststore");
        props.put("ssl.truststore.password", "kafka123");
        props.put("ssl.keystore.location", "/opt/kafka/conf/certs/kafka.keystore");
        props.put("ssl.keystore.password", "kafka123");

        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        props.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId);
        props.put(ConsumerConfig.GROUP_ID_CONFIG,
                "StockPriceConsumer");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                StockDeserializer.class.getName());
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);
        return new KafkaConsumer<>(props);
    }
...
}

ACTION - EDIT lab8.2/solution/src/main/java/com/cloudurable/kafka/consumer/ConsumerUtil.java and follow instructions

Kafka support multiple SASL Providers

Kafka supports more than one SASL provider

~/kafka-training/lab8.2/solution/jaas/kafka_broker_jaas.conf

KafkaServer {
  org.apache.kafka.common.security.scram.ScramLoginModule required
  username="admin"
  password="kafka123";
  
  org.apache.kafka.common.security.plain.PlainLoginModule required
  username="admin"
  password="admin-secret";
  user_admin="foobar"
  user_alice="barbaz";
};

ACTION - EDIT lab8.2/solution/jaas/kafka_broker_jaas.conf and follow instructions

Modifying SASL mechanism in a Running Cluster

SASL mechanism can be modified in a running cluster using the following sequence:

Enable new SASL mechanism, add mechanism sasl.enabled.mechanisms in Broker Config server.properties.
Update JAAS config file to include both mechanisms as describe.
Bounce one Kafka Broker at a time.
Restart clients using new mechanism.
Change mechanism of inter-broker communication (if this is required), set sasl.mechanism.inter.broker.protocol in Broker Config server.properties to new mechanism and bounce Kafka Brokers one at a time.
Remove old mechanism (if this is required), remove old mechanism from sasl.enabled.mechanisms in Broker Config server.properties and remove entries for old mechanism from JAAS config file, and once again bounce Kafka Broker one at a time.

Adding ACLs to users

~/kafka-training/lab8.2/solution/config/server-0.properties

broker.id=0

authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
allow.everyone.if.no.acl.found=true

listeners=SASL_PLAINTEXT://localhost:9092,SASL_SSL://localhost:10092
sasl.mechanism.inter.broker.protocol=GSSAPI
sasl.enabled.mechanisms=GSSAPI
sasl.kerberos.service.name=kafka
security.inter.broker.protocol=SASL_PLAINTEXT

ssl.keystore.location=/opt/kafka/conf/certs/kafka.keystore
ssl.keystore.password=kafka123
ssl.key.password=kafka123
ssl.truststore.location=/opt/kafka/conf/certs/kafka.truststore
ssl.truststore.password=kafka123
ssl.client.auth=required

log.dirs=./logs/kafka-0

default.replication.factor=3
num.partitions=8
min.insync.replicas=2 
auto.create.topics.enable=false
broker.rack=us-west2-a
queued.max.requests=1000
auto.leader.rebalance.enable=true

zookeeper.connect=localhost:2181
delete.topic.enable=true
compression.type=producer
message.max.bytes=65536
replica.lag.time.max.ms=5000
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
num.recovery.threads.per.data.dir=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connection.timeout.ms=6000

ACTION - EDIT config/server-0.properties and follow instructions

~/kafka-training/lab8.2/solution/bin/create-acl.sh

#!/usr/bin/env bash
cd ~/kafka-training

kafka/bin/kafka-acls.sh \
	--authorizer-properties zookeeper.connect=localhost:2181 \
	--add --allow-principal User:stocks_consumer \
	--allow-host 10.0.1.11 --allow-host 198.51.100.1 \
	--operation Read  --topic stock-prices
	
kafka/bin/kafka-acls.sh \
	--authorizer-properties zookeeper.connect=localhost:2181 \
	--add --allow-principal User:stocks_producer \
	--allow-host 10.0.1.11 --allow-host 198.51.100.1 \
	--operation Write  --topic stock-prices

--allow-principal (Configure the user who connects to broker)
--allow-host (Configure the host who connects to broker)
--operation (Write to producers or Read to consumers)
--topic (Configure the topic)

ACTION - EDIT bin/create-acl.sh and follow instructions

Run the lab

ACTION - RUN ZooKeeper and three Kafka Brokers (scripts are under bin for ZooKeeper and Kafka Brokers).

ACTION - RUN ConsumerBlueMain from the IDE

ACTION - RUN StockPriceProducer from the IDE

Expected results

You should be able to send records from the producer to the broker and read records from the consumer to the broker using SASL.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment