Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save jairsjunior/e15f531ff3a3ac022c17d8979bca5b36 to your computer and use it in GitHub Desktop.
Save jairsjunior/e15f531ff3a3ac022c17d8979bca5b36 to your computer and use it in GitHub Desktop.
Kafka Validation AuthenticateCallbackHandler OAuth implementation
package br.com.jairsjunior.security.oauthbearer;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler;
import org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule;
import org.apache.kafka.common.security.oauthbearer.OAuthBearerValidatorCallback;
import org.apache.kafka.common.security.oauthbearer.internals.unsecured.OAuthBearerValidationResult;
import org.apache.kafka.common.utils.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.security.auth.callback.Callback;
import javax.security.auth.callback.UnsupportedCallbackException;
import javax.security.auth.login.AppConfigurationEntry;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
public class OauthAuthenticateValidatorCallbackHandler implements AuthenticateCallbackHandler {
private final Logger log = LoggerFactory.getLogger(OauthAuthenticateValidatorCallbackHandler.class);
private List<AppConfigurationEntry> jaasConfigEntries;
private Map<String, String> moduleOptions = null;
private boolean configured = false;
private Time time = Time.SYSTEM;
@Override
public void configure(Map<String, ?> map, String saslMechanism, List<AppConfigurationEntry> jaasConfigEntries) {
if (!OAuthBearerLoginModule.OAUTHBEARER_MECHANISM.equals(saslMechanism))
throw new IllegalArgumentException(String.format("Unexpected SASL mechanism: %s", saslMechanism));
if (Objects.requireNonNull(jaasConfigEntries).size() != 1 || jaasConfigEntries.get(0) == null)
throw new IllegalArgumentException(
String.format("Must supply exactly 1 non-null JAAS mechanism configuration (size was %d)",
jaasConfigEntries.size()));
this.moduleOptions = Collections.unmodifiableMap((Map<String, String>) jaasConfigEntries.get(0).getOptions());
configured = true;
}
public boolean isConfigured(){
return this.configured;
}
@Override
public void close() {
}
@Override
public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException {
if (!isConfigured())
throw new IllegalStateException("Callback handler not configured");
for (Callback callback : callbacks) {
if (callback instanceof OAuthBearerValidatorCallback)
try {
OAuthBearerValidatorCallback validationCallback = (OAuthBearerValidatorCallback) callback;
handleCallback(validationCallback);
} catch (KafkaException e) {
throw new IOException(e.getMessage(), e);
}
else
throw new UnsupportedCallbackException(callback);
}
}
private void handleCallback(OAuthBearerValidatorCallback callback){
String accessToken = callback.tokenValue();
if (accessToken == null)
throw new IllegalArgumentException("Callback missing required token value");
log.info("Trying to introspect Token!");
OauthBearerTokenJwt token = OauthHttpCalls.introspectBearer(accessToken);
log.info("Trying to introspected");
// Implement Check Expire Token..
long now = time.milliseconds();
if(now > token.expirationTime()){
OAuthBearerValidationResult.newFailure("Expired Token, needs refresh!");
}
log.info("Validated! token..");
callback.token(token);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment