Skip to content

Instantly share code, notes, and snippets.

@rkpattnaik780
Last active March 7, 2024 14:08
Show Gist options
  • Save rkpattnaik780/b99334844d53813b8a85892113d42dff to your computer and use it in GitHub Desktop.
Save rkpattnaik780/b99334844d53813b8a85892113d42dff to your computer and use it in GitHub Desktop.
Code snippets for Springboot Kafka client using SASL/OAuthBearer
import java.util.HashMap;
import java.util.Map;
public class KafkaConfig {
static Map<String, Object> config() {
Map<String, Object> config = new HashMap<>();
config.put("bootstrap.servers", KAFKA_HOST);
config.put("security.protocol", "SASL_SSL");
config.put("sasl.mechanism", "OAUTHBEARER");
config.put("sasl.jaas.config", "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required clientId=\"" + RHOAS_SERVICE_ACCOUNT_CLIENT_ID + "\" clientSecret=\"" + RHOAS_SERVICE_ACCOUNT_CLIENT_SECRET + "\" oauth.token.endpoint.uri=\"" + RHOAS_SERVICE_ACCOUNT_OAUTH_TOKEN_URL + "\";");
config.put("sasl.login.callback.handler.class", "org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler");
config.put("sasl.oauthbearer.token.endpoint.url", "https://sso.redhat.com/auth/realms/redhat-external/protocol/openid-connect/token");
config.put("sasl.oauthbearer.scope.claim.name", "api.iam.service_accounts");
return config;
}
}
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import java.util.Map;
@SpringBootApplication
public class ProducerExample {
public static void main(String[] args) {
SpringApplication.run(ProducerExample.class, args);
}
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> config = KafkaConfig.config();
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<String, String>(config);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@Bean
public ApplicationRunner runner(KafkaTemplate<String, String> template) {
return args -> {
template.send(TOPIC, "Sample message");
};
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment