Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
cassandra-driver CPU issue
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.ably.cassandra.auth;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import org.apache.cassandra.auth.Auth;
import org.apache.cassandra.auth.AuthenticatedUser;
import org.apache.cassandra.auth.DataResource;
import org.apache.cassandra.auth.ISaslAwareAuthenticator;
import org.apache.cassandra.exceptions.AuthenticationException;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.exceptions.InvalidRequestException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class AblyAuthenticator implements ISaslAwareAuthenticator
{
private static final Logger logger = LoggerFactory.getLogger(AblyAuthenticator.class);
private static final String ABLY_ENVIRONMENTS = "ABLY_ENVIRONMENTS";
private static final String DEFAULT_ENVIRONMENTS = "production,sandbox,staging";
private static final String CASSANDRA_SUPER_USER_PASSWORD = "CASSANDRA_SUPER_USER_PASSWORD";
private static final String ABLY_ = "ABLY_";
private static final String KEYSPACE = "_KEYSPACE";
private static final String ADMIN_USER_NAME = "_ADMIN_USER_NAME";
private static final String RUNTIME_USER_NAME = "_RUNTIME_USER_NAME";
private static final String ADMIN_PASSWORD = "_ADMIN_PASSWORD";
private static final String RUNTIME_PASSWORD = "_RUNTIME_PASSWORD";
private static final int MAX_PASSWORD_LENGTH = 64;
private String ablyEnvVar(String environment, String type) { return ABLY_ + environment.toUpperCase() + type; }
@Override
public boolean requireAuthentication() {
return true;
}
@Override
public Set<Option> supportedOptions() {
return Collections.emptySet();
}
@Override
public Set<Option> alterableOptions() {
return Collections.emptySet();
}
@Override
public AuthenticatedUser authenticate(Map<String, String> credentials) throws AuthenticationException
{
return authenticate(credentials.get(USERNAME_KEY), credentials.get(PASSWORD_KEY));
}
@Override
public void create(String username, Map<Option, Object> options) throws InvalidRequestException {
throw new InvalidRequestException("CREATE USER operation is not supported by AblyAuthenticator");
}
@Override
public void alter(String username, Map<Option, Object> options) throws InvalidRequestException {
throw new InvalidRequestException("ALTER USER operation is not supported by AblyAuthenticator");
}
@Override
public void drop(String username) throws InvalidRequestException {
throw new InvalidRequestException("DROP USER operation is not supported by AblyAuthenticator");
}
@Override
public Set<DataResource> protectedResources() {
return Collections.emptySet();
}
@Override
public void validateConfiguration() throws ConfigurationException {
}
@Override
public ISaslAwareAuthenticator.SaslAuthenticator newAuthenticator() {
return new PlainTextSaslAuthenticator();
}
@Override
public void setup() {
logger.info("AblyAuthenticator setup()");
if(!logger.isInfoEnabled())
logger.warn("AblyAuthenticator has no INFO logging");
superuserPassword = System.getenv(CASSANDRA_SUPER_USER_PASSWORD);
if(superuserPassword == null) {
logger.warn("AblyAuthenticator using default superuser password");
superuserPassword = "xx";//Auth.DEFAULT_SUPERUSER_NAME;
} else {
logger.info("AblyAuthenticator loading non-default superuser password");
}
String environmentNames = System.getenv(ABLY_ENVIRONMENTS);
if(environmentNames == null) {
logger.warn("AblyAuthenticator using default environments");
environmentNames = DEFAULT_ENVIRONMENTS;
} else {
logger.info("AblyAuthenticator loading environments {}", environmentNames);
}
for(String name : environmentNames.split(",")) {
try {
Environment environment = new Environment(name);
environments.put(name, environment);
if(environment.adminUsername != null) {
logger.info("AblyAuthenticator adding user {} for environment {}", environment.adminUsername, name);
environmentsByUsername.put(environment.adminUsername, environment);
}
if(environment.runtimeUsername != null) {
logger.info("AblyAuthenticator adding user {} for environment {}", environment.runtimeUsername, name);
environmentsByUsername.put(environment.runtimeUsername, environment);
}
} catch(Throwable t) {
logger.warn("AblyAuthenticator unable to instantiate environment {}", name);
}
}
}
Environment getEnvironmentForUser(String username) {
return environmentsByUsername.get(username);
}
public class Environment {
public final String name;
public final String keyspace;
public final String adminUsername;
public final String runtimeUsername;
private final String[] adminPasswords;
private final String[] runtimePasswords;
Environment(String name) {
this.name = name;
keyspace = System.getenv(ablyEnvVar(name, KEYSPACE));
if(keyspace == null) {
logger.info("AblyAuthenticator no keyspace found for environment {}", name);
throw new IllegalArgumentException("No keyspace found for environment");
}
adminUsername = System.getenv(ablyEnvVar(name, ADMIN_USER_NAME));
if(adminUsername == null) {
logger.warn("AblyAuthenticator no admin username found for environment {}", name);
adminPasswords = null;
} else {
String adminPasswordConf = System.getenv(ablyEnvVar(name, ADMIN_PASSWORD));
if(adminPasswordConf == null) {
logger.info("AblyAuthenticator no password found for admin user for environment {}", name);
throw new IllegalArgumentException("No password found for user");
}
adminPasswords = adminPasswordConf.split(",");
}
runtimeUsername = System.getenv(ablyEnvVar(name, RUNTIME_USER_NAME));
if(runtimeUsername == null) {
logger.warn("AblyAuthenticator no runtime username found for environment {}", name);
runtimePasswords = null;
} else {
String runtimePasswordConf = System.getenv(ablyEnvVar(name, RUNTIME_PASSWORD));
if(runtimePasswordConf == null) {
logger.info("AblyAuthenticator no password found for runtime user for environment {}", name);
throw new IllegalArgumentException("No password found for user");
}
runtimePasswords = runtimePasswordConf.split(",");
}
}
boolean authenticate(String possibleUsername, String possiblePassword) {
if(adminUsername.equals(possibleUsername)) {
for(String password : adminPasswords) {
if(constantTimeCompare(password, possiblePassword))
return true;
}
}
else if(runtimeUsername.equals(possibleUsername)) {
for(String password : runtimePasswords) {
if(constantTimeCompare(password, possiblePassword))
return true;
}
}
return false;
}
}
private AuthenticatedUser authenticate(String username, String password) throws AuthenticationException {
if(username == null) {
logger.warn("AblyAuthenticator failed to authenticate (no username provided)");
throw new AuthenticationException(String.format("Required key '%s' is missing", USERNAME_KEY));
}
if(password == null) {
logger.warn("AblyAuthenticator failed to authenticate (no password provided)");
throw new AuthenticationException(String.format("Required key '%s' is missing", PASSWORD_KEY));
}
if(Auth.DEFAULT_SUPERUSER_NAME.equals(username)) {
if(constantTimeCompare(superuserPassword, password))
return new SuperUser(username);
logger.warn("AblyAuthenticator failed to authenticate superuser");
throw new AuthenticationException("Username and/or password are incorrect");
}
Environment environment = getEnvironmentForUser(username);
if(environment == null) {
logger.warn("AblyAuthenticator failed to authenticate {} (unknown user)", username);
throw new AuthenticationException("Username and/or password are incorrect");
}
if(!environment.authenticate(username, password)) {
logger.warn("AblyAuthenticator failed to authenticate {} (invalid password)", username);
throw new AuthenticationException("Username and/or password are incorrect");
}
logger.info("AblyAuthenticator authenticated user {}", username);
return new EnvironmentUser(environment, username);
}
private class PlainTextSaslAuthenticator implements SaslAuthenticator {
private static final byte NUL = 0;
private boolean complete = false;
private String username;
private String password;
@Override
public byte[] evaluateResponse(byte[] clientResponse) throws AuthenticationException {
decodeCredentials(clientResponse);
complete = true;
return null;
}
@Override
public boolean isComplete() {
return complete;
}
@Override
public AuthenticatedUser getAuthenticatedUser() throws AuthenticationException {
return authenticate(username, password);
}
/**
* SASL PLAIN mechanism specifies that credentials are encoded in a
* sequence of UTF-8 bytes, delimited by 0 (US-ASCII NUL).
* The form is : {code}authzId<NUL>authnId<NUL>password<NUL>{code}
* authzId is optional, and in fact we don't care about it here as we'll
* set the authzId to match the authnId (that is, there is no concept of
* a user being authorized to act on behalf of another).
*
* @param bytes encoded credentials string sent by the client
* @return map containing the username/password pairs in the form an IAuthenticator
* would expect
* @throws javax.security.sasl.SaslException
*/
private void decodeCredentials(byte[] bytes) throws AuthenticationException
{
logger.info("Decoding credentials from client token");
byte[] usernameBytes = null;
byte[] passwordBytes = null;
int end = bytes.length;
for (int i = bytes.length - 1 ; i >= 0; i--)
{
if(bytes[i] == NUL)
{
if(passwordBytes == null)
passwordBytes = Arrays.copyOfRange(bytes, i + 1, end);
else if(usernameBytes == null)
usernameBytes = Arrays.copyOfRange(bytes, i + 1, end);
end = i;
}
}
if(usernameBytes == null)
throw new AuthenticationException("Authentication ID must not be null");
if(passwordBytes == null)
throw new AuthenticationException("Password must not be null");
username = new String(usernameBytes, StandardCharsets.UTF_8);
password = new String(passwordBytes, StandardCharsets.UTF_8);
}
}
private boolean constantTimeCompare(String one, String two) {
int oneLength = one.length(), twoLength = two.length();
boolean result = (oneLength == twoLength);
int minLength = Math.min(oneLength, twoLength);
for(int i = 0; i < MAX_PASSWORD_LENGTH; i++) {
int idx = i % minLength;
result &= (one.charAt(idx) == two.charAt(idx));
}
return result;
}
private String superuserPassword;
private final Map<String, Environment> environments = new HashMap<String, Environment>();
private final Map<String, Environment> environmentsByUsername = new HashMap<String, Environment>();
public static class EnvironmentUser extends AuthenticatedUser {
public EnvironmentUser(Environment environment, String name) {
super(name);
this.environment = environment;
}
public boolean isSuper() {
return false;
}
/* we pretend to be anonymous to defeat the check in ClientState.login();
* as an anonymous user we're not able to perform user-management queries
* but they're not supported by this Authorizer anyway */
public boolean isAnonymous() {
return true;
}
public final Environment environment;
}
public static class SuperUser extends AuthenticatedUser {
public SuperUser(String name) {
super(name);
}
public boolean isSuper() {
return true;
}
}
}
module Cassandra
module Auth
module Providers
class AblyAuthentication < Provider
class Authenticator
def initialize(username, password)
@username = username
@password = password
end
def initial_response
puts "initial response"
"\x00#{@username}\x00#{@password}"
end
def challenge_response(token)
puts "challenge response"
end
def authentication_successful(token)
puts "success"
end
end
def initialize(username, password)
@username = username
@password = password
end
def create_authenticator(authentication_class)
if authentication_class == PASSWORD_AUTHENTICATOR_FQCN
Authenticator.new(@username, @password)
end
end
PASSWORD_AUTHENTICATOR_FQCN = 'io.ably.cassandra.auth.AblyAuthenticator'.freeze
end
end
end
end
logger = Logger.new(STDOUT)
logger.level = Logger::DEBUG
puts "Previous PASSWORD_AUTHENTICATOR_FQCN: #{::Cassandra::Auth::Providers::Password::PASSWORD_AUTHENTICATOR_FQCN}"
::Cassandra::Auth::Providers::Password.send(:remove_const, :PASSWORD_AUTHENTICATOR_FQCN)
::Cassandra::Auth::Providers::Password.const_set(:PASSWORD_AUTHENTICATOR_FQCN, 'io.ably.cassandra.auth.AblyAuthenticator'.freeze)
puts "New PASSWORD_AUTHENTICATOR_FQCN: #{::Cassandra::Auth::Providers::Password::PASSWORD_AUTHENTICATOR_FQCN}"
Cassandra.cluster(
username: username,
password: password,
hosts: [host],
logger: logger
)
logger = Logger.new(STDOUT)
logger.level = Logger::DEBUG
Cassandra.cluster(
auth_provider: Cassandra::Auth::Providers::AblyAuthentication.new(username, password),
hosts: [host],
logger: logger
)
D, [2015-02-19T22:49:49.029830 #17613] DEBUG -- : Populating policies and listeners with initial endpoints
D, [2015-02-19T22:49:49.038253 #17613] DEBUG -- : Host 0.0.0.0 is found and up
I, [2015-02-19T22:49:49.038330 #17613] INFO -- : Establishing control connection
D, [2015-02-19T22:49:49.038532 #17613] DEBUG -- : Connecting to 0.0.0.0
I, [2015-02-19T22:49:49.041143 #17613] INFO -- : Host 0.0.0.0 doesn't support protocol version 3, downgrading
initial response
success
D, [2015-02-19T22:49:49.077973 #17613] DEBUG -- : 5 peer(s) found
D, [2015-02-19T22:49:49.086968 #17613] DEBUG -- : Host 0.0.0.0 metadata has been updated, it will be considered lost and found
D, [2015-02-19T22:49:49.087241 #17613] DEBUG -- : Host 0.0.0.0 is down and lost
D, [2015-02-19T22:49:49.087507 #17613] DEBUG -- : Starting to continuously refresh status of 0.0.0.0 in 0.5 seconds
D, [2015-02-19T22:49:49.087795 #17613] DEBUG -- : Host 0.0.0.0 is found and up
D, [2015-02-19T22:49:49.088083 #17613] DEBUG -- : Host 172.17.0.13 is found and up
D, [2015-02-19T22:49:49.088312 #17613] DEBUG -- : Host 172.17.0.14 is found and up
D, [2015-02-19T22:49:49.088538 #17613] DEBUG -- : Host 172.17.0.24 is found and up
D, [2015-02-19T22:49:49.088772 #17613] DEBUG -- : Host 172.17.0.10 is found and up
D, [2015-02-19T22:49:49.088993 #17613] DEBUG -- : Host 172.17.0.10 metadata has been updated, it will be considered lost and found
D, [2015-02-19T22:49:49.089181 #17613] DEBUG -- : Host 172.17.0.10 is down and lost
D, [2015-02-19T22:49:49.089379 #17613] DEBUG -- : Host 172.17.0.10 is found and up
D, [2015-02-19T23:26:11.277249 #21625] DEBUG -- : Populating policies and listeners with initial endpoints
D, [2015-02-19T23:26:11.277531 #21625] DEBUG -- : Host 0.0.0.0 is found and up
I, [2015-02-19T23:26:11.277773 #21625] INFO -- : Establishing control connection
D, [2015-02-19T23:26:11.278125 #21625] DEBUG -- : Connecting to 0.0.0.0
I, [2015-02-19T23:26:11.281161 #21625] INFO -- : Host 0.0.0.0 doesn't support protocol version 3, downgrading
D, [2015-02-19T23:26:11.283493 #21625] DEBUG -- : Host 0.0.0.0 refused connection (Cassandra::Errors::AuthenticationError: Server requested authentication, but client was not configured to authenticate)
W, [2015-02-19T23:26:11.283754 #21625] WARN -- : Host 0.0.0.0 refused all connections
D, [2015-02-19T23:26:11.283934 #21625] DEBUG -- : Host 0.0.0.0 is down
D, [2015-02-19T23:26:11.284138 #21625] DEBUG -- : Starting to continuously refresh status of 0.0.0.0 in 0.5 seconds
D, [2015-02-19T23:26:11.284388 #21625] DEBUG -- : Connection to 0.0.0.0 failed (Cassandra::Errors::AuthenticationError: Server requested authentication, but client was not configured to authenticate)
Previous PASSWORD_AUTHENTICATOR_FQCN: org.apache.cassandra.auth.PasswordAuthenticator
New PASSWORD_AUTHENTICATOR_FQCN: io.ably.cassandra.auth.AblyAuthenticator
D, [2015-02-19T23:30:58.917928 #22234] DEBUG -- : Populating policies and listeners with initial endpoints
D, [2015-02-19T23:30:58.918221 #22234] DEBUG -- : Host 0.0.0.0 is found and up
I, [2015-02-19T23:30:58.918470 #22234] INFO -- : Establishing control connection
D, [2015-02-19T23:30:58.918877 #22234] DEBUG -- : Connecting to 0.0.0.0
I, [2015-02-19T23:30:58.929126 #22234] INFO -- : Host 0.0.0.0 doesn't support protocol version 3, downgrading
D, [2015-02-19T23:30:58.966395 #22234] DEBUG -- : 5 peer(s) found
D, [2015-02-19T23:30:58.966658 #22234] DEBUG -- : Host 0.0.0.0 metadata has been updated, it will be considered lost and found
D, [2015-02-19T23:30:58.966906 #22234] DEBUG -- : Host 0.0.0.0 is down and lost
D, [2015-02-19T23:30:58.967133 #22234] DEBUG -- : Starting to continuously refresh status of 0.0.0.0 in 0.5 seconds
D, [2015-02-19T23:30:58.967407 #22234] DEBUG -- : Host 0.0.0.0 is found and up
D, [2015-02-19T23:30:58.967680 #22234] DEBUG -- : Host 172.17.0.10 is found and up
D, [2015-02-19T23:30:58.967911 #22234] DEBUG -- : Host 172.17.0.10 metadata has been updated, it will be considered lost and found
D, [2015-02-19T23:30:58.968096 #22234] DEBUG -- : Host 172.17.0.10 is down and lost
D, [2015-02-19T23:30:58.968298 #22234] DEBUG -- : Host 172.17.0.10 is found and up
D, [2015-02-19T23:30:58.968536 #22234] DEBUG -- : Host 172.17.0.24 is found and up
D, [2015-02-19T23:30:58.968756 #22234] DEBUG -- : Host 172.17.0.13 is found and up
D, [2015-02-19T23:30:58.968970 #22234] DEBUG -- : Host 172.17.0.14 is found and up
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment