Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save jairsjunior/d9b3a7a3bf381e709853a3a18b1cccea to your computer and use it in GitHub Desktop.
Save jairsjunior/d9b3a7a3bf381e709853a3a18b1cccea to your computer and use it in GitHub Desktop.
Kafka Login AuthenticateCallbackHandler OAuth implementation
package br.com.jairsjunior.security.oauthbearer;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler;
import org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule;
import org.apache.kafka.common.security.oauthbearer.OAuthBearerTokenCallback;
import javax.security.auth.callback.Callback;
import javax.security.auth.callback.UnsupportedCallbackException;
import javax.security.auth.login.AppConfigurationEntry;
import java.io.IOException;
import java.util.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class OauthAuthenticateLoginCallbackHandler implements AuthenticateCallbackHandler {
private final Logger log = LoggerFactory.getLogger(OauthAuthenticateLoginCallbackHandler.class);
private Map<String, String> moduleOptions = null;
private boolean configured = false;
@Override
public void configure(Map<String, ?> map, String saslMechanism, List<AppConfigurationEntry> jaasConfigEntries) {
if (!OAuthBearerLoginModule.OAUTHBEARER_MECHANISM.equals(saslMechanism))
throw new IllegalArgumentException(String.format("Unexpected SASL mechanism: %s", saslMechanism));
if (Objects.requireNonNull(jaasConfigEntries).size() != 1 || jaasConfigEntries.get(0) == null)
throw new IllegalArgumentException(
String.format("Must supply exactly 1 non-null JAAS mechanism configuration (size was %d)",
jaasConfigEntries.size()));
this.moduleOptions = Collections.unmodifiableMap((Map<String, String>) jaasConfigEntries.get(0).getOptions());
configured = true;
}
public boolean isConfigured(){
return this.configured;
}
@Override
public void close() {
}
@Override
public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException {
if (!isConfigured())
throw new IllegalStateException("Callback handler not configured");
for (Callback callback : callbacks) {
if (callback instanceof OAuthBearerTokenCallback)
try {
handleCallback((OAuthBearerTokenCallback) callback);
} catch (KafkaException e) {
throw new IOException(e.getMessage(), e);
}
else
throw new UnsupportedCallbackException(callback);
}
}
private void handleCallback(OAuthBearerTokenCallback callback){
if (callback.token() != null)
throw new IllegalArgumentException("Callback had a token already");
log.info("Try to acquire token!");
OauthBearerTokenJwt token = OauthHttpCalls.login(null);
log.info("Retrieved token..");
if(token == null){
throw new IllegalArgumentException("Null token returned from server");
}
callback.token(token);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment