Steps for KoP authentication and authorization Based on JWT.
Please reference to Pulsar Security Overview to understand the Pulsar authen and author. Currently KoP support Pulsar Token auth
Before start, preparing a KoP tar ball and a Pulsar tar ball(mainly use puslar-admin tools in this tar ball).
In KoP, we currently support SASL_PLAINTEXT
, in which a username/password pair needs provide.
We reused the Token provider in Pulsar to implement a KoP SASL authentication, the authorization still use Pulsar authorization.
e.g. in this Kafka Client authentication config below:
- the
username
is not the real user name, but a tenant/namespace that set for Kafka to use(this could set kafkaTenant and kafkaNamespace in kop config file). - the
password
is a token that created for a auth user byPulsar Token auth tools
, when auth success, inside KoP it will return and stand for a authed-user(in the example below, it istest-user
).
sasl.jaas.config="org.apache.kafka.common.security.plain.PlainLoginModule required username=\"public/default\" password=\"token:eyJhbGciOiJIUzI2NiJ9.eyJzdWIiOiJ0ZXN0LXVzZXIifQ.EM0bK-K65xZeHEN8Wbjl0OQcm6lWHkBzEv8ngysUBX4\";"
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
A Tutorial for use Symmetric Secret key
- In Pulsar root dir, create a secret key used in broker side.
bin/pulsar tokens create-secret-key --output my-secret.key
Place the create file my-secret.key
into file:///path/to/my-secret.key
.
- In Pulsar root dir, Generate a token associated with a user(test-user) using created secret key:
bin/pulsar tokens create --secret-key file:///path/to/my-secret.key --subject test-user
e.g. In my cmd:
➜ pulsar git:(master) ✗ bin/pulsar tokens create --secret-key file:///Users/jia/ws/code/pulsar/my-secret.key --subject test-user
eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJ0ZXN0LXVzZXIifQ.EM0bK-K65xZeHEN8Wbjl0OQcm6lWHkBzEv8ngysUBX4
- In KoP Broker side, change the following setting in file
conf/kop_standalone.conf
in KoP root dir.
authenticationEnabled=true
authenticationProviders=org.apache.pulsar.broker.authentication.AuthenticationProviderToken
authorizationEnabled=true
superUserRoles=test-user < === this is the name of --subject in above 2nd step.
tokenSecretKey=file:///path/to/my-secret.key < == key path above
brokerClientAuthenticationPlugin=org.apache.pulsar.client.impl.auth.AuthenticationToken
brokerClientAuthenticationParameters=token:eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJ0ZXN0LXVzZXIifQ.EM0bK-K65xZeHEN8Wbjl0OQcm6lWHkBzEv8ngysUBX4 < == token above
- In Pulsar root dir, change content of file
conf/client.conf
(this file is used by pulsar-admin and pulsar-client).
authPlugin=org.apache.pulsar.client.impl.auth.AuthenticationToken
authParams=token:eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJ0ZXN0LXVzZXIifQ.EM0bK-K65xZeHEN8Wbjl0OQcm6lWHkBzEv8ngysUBX4
- In KoP root dir, start KoP in standalone mode.
bin/kop standalone
- In Pulsar root dir, grant permissions for the role.
bin/pulsar-admin namespaces create public/default < === This is the default kafka namespace, it may be already created.
bin/pulsar-admin namespaces grant-permission public/default --role test-user --actions produce,consume
- In Pulsar root dir, we should be able to verify that the auth be working.
bin/pulsar-client consume topicpulsar -s sub-name -n 0 -t Shared
bin/pulsar-client produce topicpulsar --messages "hello-test1" -n 10
- In KoP, the Kafka-Console-Producer not handed passed in config files very well. We may need to write your own code to verify it.
Here is an simple code example:
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
public class DemoSASL {
public static void main(String[] args) {
String topicName = "topic7";
String bootServer = "127.0.0.1:9092";
String producerId = "DemoKafkaOnPulsarSaslProducer";
String consumerId = "DemoKafkaOnPulsarSaslConsumer";
String intSerializer = IntegerSerializer.class.getName();
String stringSerializer = StringSerializer.class.getName();
String intDeserializer = IntegerDeserializer.class.getName();
String stringDeserializer = StringDeserializer.class.getName();
String consumerGroupId = "DemoConsumerGroup-A";
// SASL settings
String username = "public/default";
String password = "token:eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJ0ZXN0LXVzZXIifQ.EM0bK-K65xZeHEN8Wbjl0OQcm6lWHkBzEv8ngysUBX4";
String jaasTemplate = "org.apache.kafka.common.security.plain.PlainLoginModule "
+ "required username=\"%s\" password=\"%s\";";
String jaasCfg = String.format(jaasTemplate, username, password);
// producer Properties:
Properties producerProps = new Properties();
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootServer);
producerProps.put(ProducerConfig.CLIENT_ID_CONFIG, producerId);
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, intSerializer);
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, stringSerializer);
producerProps.put("sasl.jaas.config", jaasCfg);
producerProps.put("security.protocol", "SASL_PLAINTEXT");
producerProps.put("sasl.mechanism", "PLAIN");
KafkaProducer<Integer, String> kProducer = new KafkaProducer<>(producerProps);
// consumer Properties:
Properties consumerProps = new Properties();
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId);
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootServer);
consumerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, consumerId);
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, intDeserializer);
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, stringDeserializer);
consumerProps.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
consumerProps.put("sasl.jaas.config", jaasCfg);
consumerProps.put("security.protocol", "SASL_PLAINTEXT");
consumerProps.put("sasl.mechanism", "PLAIN");
KafkaConsumer<Integer, String> kConsumer = new KafkaConsumer<>(consumerProps);
int totalMsgs = 10;
String messageStrPrefix = topicName + "_message_";
for (int i = 0; i < totalMsgs; i++) {
String messageStr = messageStrPrefix + i;
kProducer.send(new ProducerRecord<>(topicName, i, messageStr));
}
kConsumer.subscribe(Collections.singleton(topicName));
int i = 0;
while (i < totalMsgs) {
ConsumerRecords<Integer, String> records = kConsumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<Integer, String> record : records) {
Integer key = record.key();
assertEquals(messageStrPrefix + key.toString(), record.value());
i++;
if (log.isDebugEnabled()) {
log.debug("Kafka consumer get message: {}, key: {} at offset {}",
record.key(), record.value(), record.offset());
}
}
}
}
}