Skip to content

Instantly share code, notes, and snippets.

@recursivecodes
Last active December 18, 2019 17:34
Show Gist options
  • Save recursivecodes/b5119c6e15fd69ff2ff2136f2874b314 to your computer and use it in GitHub Desktop.
Save recursivecodes/b5119c6e15fd69ff2ff2136f2874b314 to your computer and use it in GitHub Desktop.
CompatibleConsumer.java
String authToken = System.getenv("AUTH_TOKEN");
String tenancyName = System.getenv("TENANCY_NAME");
String username = System.getenv("STREAMING_USERNAME");
String streamPoolId = System.getenv("STREAM_POOL_ID");
String topicName = System.getenv("TOPIC_NAME");
Properties properties = new Properties();
properties.put("bootstrap.servers", "streaming.us-phoenix-1.oci.oraclecloud.com:9092");
properties.put("security.protocol", "SASL_SSL");
properties.put("sasl.mechanism", "PLAIN");
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group-0");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put("sasl.jaas.config",
        "org.apache.kafka.common.security.plain.PlainLoginModule required username=\""
                + tenancyName + "/"
                + username + "/"
                + streamPoolId + "\" "
                + "password=\""
                + authToken + "\";"
);
properties.put("max.partition.fetch.bytes", 1024 * 1024); // limit request size to 1MB per partition
Consumer<Long, String> consumer = new KafkaConsumer<>(properties);
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment