Skip to content

Instantly share code, notes, and snippets.

@mageshn
Last active February 8, 2018 23:47
Show Gist options
  • Save mageshn/c623f2d43b66b1153f5da32662ca5e94 to your computer and use it in GitHub Desktop.
Save mageshn/c623f2d43b66b1153f5da32662ca5e94 to your computer and use it in GitHub Desktop.
SR integration with security
public void send() {
Properties props = new Properties();
props.put(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"SASL_SSL://test:9092"
);
props.put(
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
io.confluent.kafka.serializers.KafkaAvroSerializer.class
);
props.put(
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
io.confluent.kafka.serializers.KafkaAvroSerializer.class
);
props.put(
"schema.registry.url",
"https://test:32001,https://test:32000"
);
props.setProperty("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule"
+ " required "
+ "username=\"test\" "
+ "password=\"test\" ;");
props.setProperty("sasl.mechanism", "PLAIN");
props.setProperty("security.protocol", "SASL_SSL");
//If you want to use same credentials as Kafka
props.setProperty("basic.auth.credentials.source", "SASL_INHERIT");
//If you want to provide your own credentials to SR
/*
props.setProperty("basic.auth.credentials.source", "USER_INFO");
props.setProperty("schema.registry.basic.auth.user.info", "username:password");
*/
KafkaProducer producer = new KafkaProducer(props);
String key = "key1";
String userSchema = "{\"type\":\"record\","
+ "\"name\":\"myrecord\","
+ "\"fields\":[{\"name\":\"f1\",\"type\":\"string\"}]}";
Schema.Parser parser = new Schema.Parser();
Schema schema = parser.parse(userSchema);
GenericRecord avroRecord = new GenericData.Record(schema);
avroRecord.put("f1", "value1");
ProducerRecord<String, GenericRecord> record = new ProducerRecord<>("topic1", key, avroRecord);
try {
producer.send(record);
} catch (SerializationException e) {
// may need to do something with it
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment