Skip to content

Instantly share code, notes, and snippets.

@jiazhai
Last active December 25, 2019 04:09
Show Gist options
  • Save jiazhai/b6fb4a6428f30a3105e3ab0094bfc070 to your computer and use it in GitHub Desktop.
Save jiazhai/b6fb4a6428f30a3105e3ab0094bfc070 to your computer and use it in GitHub Desktop.
Steps for KoP authentication and authorization Based on JWT.

Steps for KoP authentication and authorization Based on JWT.

Please reference to Pulsar Security Overview to understand the Pulsar authen and author. Currently KoP support Pulsar Token auth

Before start, preparing a KoP tar ball and a Pulsar tar ball(mainly use puslar-admin tools in this tar ball).

Some background before tutorial

In KoP, we currently support SASL_PLAINTEXT, in which a username/password pair needs provide. We reused the Token provider in Pulsar to implement a KoP SASL authentication, the authorization still use Pulsar authorization.

e.g. in this Kafka Client authentication config below:

  1. the username is not the real user name, but a tenant/namespace that set for Kafka to use(this could set kafkaTenant and kafkaNamespace in kop config file).
  2. the password is a token that created for a auth user by Pulsar Token auth tools, when auth success, inside KoP it will return and stand for a authed-user(in the example below, it is test-user).
sasl.jaas.config="org.apache.kafka.common.security.plain.PlainLoginModule required username=\"public/default\"  password=\"token:eyJhbGciOiJIUzI2NiJ9.eyJzdWIiOiJ0ZXN0LXVzZXIifQ.EM0bK-K65xZeHEN8Wbjl0OQcm6lWHkBzEv8ngysUBX4\";"

security.protocol=SASL_PLAINTEXT

sasl.mechanism=PLAIN

A Tutorial for use Symmetric Secret key

  1. In Pulsar root dir, create a secret key used in broker side.
bin/pulsar tokens create-secret-key --output my-secret.key

Place the create file my-secret.key into file:///path/to/my-secret.key.

  1. In Pulsar root dir, Generate a token associated with a user(test-user) using created secret key:
bin/pulsar tokens create --secret-key file:///path/to/my-secret.key --subject test-user

e.g. In my cmd:

➜  pulsar git:(master) ✗ bin/pulsar tokens create --secret-key file:///Users/jia/ws/code/pulsar/my-secret.key --subject test-user
eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJ0ZXN0LXVzZXIifQ.EM0bK-K65xZeHEN8Wbjl0OQcm6lWHkBzEv8ngysUBX4
  1. In KoP Broker side, change the following setting in file conf/kop_standalone.conf in KoP root dir.
 authenticationEnabled=true
 authenticationProviders=org.apache.pulsar.broker.authentication.AuthenticationProviderToken
 authorizationEnabled=true
 superUserRoles=test-user   < === this is the name of --subject in above 2nd step.
 tokenSecretKey=file:///path/to/my-secret.key   < == key path above

 brokerClientAuthenticationPlugin=org.apache.pulsar.client.impl.auth.AuthenticationToken
 brokerClientAuthenticationParameters=token:eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJ0ZXN0LXVzZXIifQ.EM0bK-K65xZeHEN8Wbjl0OQcm6lWHkBzEv8ngysUBX4  < == token above
  1. In Pulsar root dir, change content of file conf/client.conf(this file is used by pulsar-admin and pulsar-client).
authPlugin=org.apache.pulsar.client.impl.auth.AuthenticationToken
authParams=token:eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJ0ZXN0LXVzZXIifQ.EM0bK-K65xZeHEN8Wbjl0OQcm6lWHkBzEv8ngysUBX4
  1. In KoP root dir, start KoP in standalone mode.
bin/kop standalone
  1. In Pulsar root dir, grant permissions for the role.
bin/pulsar-admin namespaces create public/default        < === This is the default kafka namespace, it may be already created.
bin/pulsar-admin namespaces grant-permission public/default  --role test-user --actions produce,consume
  1. In Pulsar root dir, we should be able to verify that the auth be working.
bin/pulsar-client consume topicpulsar -s sub-name -n 0 -t Shared

bin/pulsar-client produce topicpulsar --messages "hello-test1" -n 10
  1. In KoP, the Kafka-Console-Producer not handed passed in config files very well. We may need to write your own code to verify it.

Here is an simple code example:

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;

public class DemoSASL {
    public static void main(String[] args) {
        String topicName = "topic7";
        String bootServer = "127.0.0.1:9092";
        String producerId = "DemoKafkaOnPulsarSaslProducer";
        String consumerId = "DemoKafkaOnPulsarSaslConsumer";

        String intSerializer = IntegerSerializer.class.getName();
        String stringSerializer = StringSerializer.class.getName();
        String intDeserializer = IntegerDeserializer.class.getName();
        String stringDeserializer = StringDeserializer.class.getName();

        String consumerGroupId = "DemoConsumerGroup-A";

        // SASL settings
        String username = "public/default";
        String password = "token:eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJ0ZXN0LXVzZXIifQ.EM0bK-K65xZeHEN8Wbjl0OQcm6lWHkBzEv8ngysUBX4";
        String jaasTemplate = "org.apache.kafka.common.security.plain.PlainLoginModule "
            + "required username=\"%s\" password=\"%s\";";
        String jaasCfg = String.format(jaasTemplate, username, password);

        // producer Properties:
        Properties producerProps = new Properties();
        producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootServer);
        producerProps.put(ProducerConfig.CLIENT_ID_CONFIG, producerId);
        producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, intSerializer);
        producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, stringSerializer);
        producerProps.put("sasl.jaas.config", jaasCfg);
        producerProps.put("security.protocol", "SASL_PLAINTEXT");
        producerProps.put("sasl.mechanism", "PLAIN");

        KafkaProducer<Integer, String> kProducer = new KafkaProducer<>(producerProps);

        // consumer Properties:
        Properties consumerProps = new Properties();
        consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId);
        consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootServer);
        consumerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, consumerId);
        consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, intDeserializer);
        consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, stringDeserializer);
        consumerProps.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");

        consumerProps.put("sasl.jaas.config", jaasCfg);
        consumerProps.put("security.protocol", "SASL_PLAINTEXT");
        consumerProps.put("sasl.mechanism", "PLAIN");

        KafkaConsumer<Integer, String> kConsumer = new KafkaConsumer<>(consumerProps);

        int totalMsgs = 10;
        String messageStrPrefix = topicName + "_message_";

        for (int i = 0; i < totalMsgs; i++) {
            String messageStr = messageStrPrefix + i;
            kProducer.send(new ProducerRecord<>(topicName, i, messageStr));
        }

        kConsumer.subscribe(Collections.singleton(topicName));

        int i = 0;
        while (i < totalMsgs) {
            ConsumerRecords<Integer, String> records = kConsumer.poll(Duration.ofSeconds(1));
            for (ConsumerRecord<Integer, String> record : records) {
                Integer key = record.key();
                assertEquals(messageStrPrefix + key.toString(), record.value());
                i++;
                if (log.isDebugEnabled()) {
                    log.debug("Kafka consumer get message: {}, key: {} at offset {}",
                        record.key(), record.value(), record.offset());
                }
            }
        }
    }
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment