Created
December 19, 2018 14:21
-
-
Save jairsjunior/d9b3a7a3bf381e709853a3a18b1cccea to your computer and use it in GitHub Desktop.
Kafka Login AuthenticateCallbackHandler OAuth implementation
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package br.com.jairsjunior.security.oauthbearer; | |
import org.apache.kafka.common.KafkaException; | |
import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler; | |
import org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule; | |
import org.apache.kafka.common.security.oauthbearer.OAuthBearerTokenCallback; | |
import javax.security.auth.callback.Callback; | |
import javax.security.auth.callback.UnsupportedCallbackException; | |
import javax.security.auth.login.AppConfigurationEntry; | |
import java.io.IOException; | |
import java.util.*; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
public class OauthAuthenticateLoginCallbackHandler implements AuthenticateCallbackHandler { | |
private final Logger log = LoggerFactory.getLogger(OauthAuthenticateLoginCallbackHandler.class); | |
private Map<String, String> moduleOptions = null; | |
private boolean configured = false; | |
@Override | |
public void configure(Map<String, ?> map, String saslMechanism, List<AppConfigurationEntry> jaasConfigEntries) { | |
if (!OAuthBearerLoginModule.OAUTHBEARER_MECHANISM.equals(saslMechanism)) | |
throw new IllegalArgumentException(String.format("Unexpected SASL mechanism: %s", saslMechanism)); | |
if (Objects.requireNonNull(jaasConfigEntries).size() != 1 || jaasConfigEntries.get(0) == null) | |
throw new IllegalArgumentException( | |
String.format("Must supply exactly 1 non-null JAAS mechanism configuration (size was %d)", | |
jaasConfigEntries.size())); | |
this.moduleOptions = Collections.unmodifiableMap((Map<String, String>) jaasConfigEntries.get(0).getOptions()); | |
configured = true; | |
} | |
public boolean isConfigured(){ | |
return this.configured; | |
} | |
@Override | |
public void close() { | |
} | |
@Override | |
public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException { | |
if (!isConfigured()) | |
throw new IllegalStateException("Callback handler not configured"); | |
for (Callback callback : callbacks) { | |
if (callback instanceof OAuthBearerTokenCallback) | |
try { | |
handleCallback((OAuthBearerTokenCallback) callback); | |
} catch (KafkaException e) { | |
throw new IOException(e.getMessage(), e); | |
} | |
else | |
throw new UnsupportedCallbackException(callback); | |
} | |
} | |
private void handleCallback(OAuthBearerTokenCallback callback){ | |
if (callback.token() != null) | |
throw new IllegalArgumentException("Callback had a token already"); | |
log.info("Try to acquire token!"); | |
OauthBearerTokenJwt token = OauthHttpCalls.login(null); | |
log.info("Retrieved token.."); | |
if(token == null){ | |
throw new IllegalArgumentException("Null token returned from server"); | |
} | |
callback.token(token); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment