Created
March 9, 2016 00:33
-
-
Save wjsl/93c8528e8f27bbeb31bf to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.io.{FileOutputStream, BufferedOutputStream, OutputStream, File} | |
import java.security.PrivilegedAction | |
import javax.security.auth.Subject | |
import com.google.common.io.Files | |
import org.apache.accumulo.cluster.ClusterUser | |
import org.apache.accumulo.core.client.impl.Namespaces | |
import org.apache.accumulo.core.client.{BatchWriterConfig, ClientConfiguration} | |
import org.apache.accumulo.core.client.security.tokens.KerberosToken | |
import org.apache.accumulo.core.conf.Property | |
import org.apache.accumulo.core.data.Mutation | |
import org.apache.accumulo.core.security.{NamespacePermission, Authorizations, SystemPermission} | |
import org.apache.accumulo.minicluster.impl.{MiniAccumuloClusterImpl, MiniAccumuloConfigImpl} | |
import org.apache.accumulo.minicluster.{MiniAccumuloCluster, MiniAccumuloConfig} | |
import org.apache.accumulo.server.security.handler.{KerberosAuthenticator, KerberosAuthorizor, KerberosPermissionHandler} | |
import org.apache.hadoop.conf.Configuration | |
import org.apache.hadoop.fs.CommonConfigurationKeysPublic | |
import org.apache.hadoop.security.UserGroupInformation | |
import org.apache.log4j.{Level, Logger} | |
import org.slf4j.LoggerFactory | |
import java.util.{Map => JMap} | |
import scala.collection.SortedSet | |
import scala.reflect.macros.Context | |
import scala.util.{Failure, Try} | |
/** | |
* Created by bill on 1/11/16. | |
*/ | |
import org.scalatest._ | |
class KerbSpec extends FlatSpec { | |
Logger.getLogger("org.apache.zookeeper").setLevel(Level.OFF) | |
Logger.getLogger("org.apache.directory").setLevel(Level.OFF) | |
Logger.getLogger(classOf[ClientConfiguration]).setLevel(Level.OFF) | |
val log = LoggerFactory.getLogger(getClass) | |
val work_dir = Files.createTempDir() | |
log.info("Working dir: {}", work_dir) | |
val kdc = TestingKdc() | |
kdc.start | |
val root = kdc.getRootUser | |
val accumuloConf = new MiniAccumuloConfigImpl(work_dir, "secret") | |
accumuloConf.setNumTservers(1) | |
val coreSite = new Configuration(false) | |
configureForKerberos(accumuloConf, coreSite, kdc) | |
val ugiConf = new Configuration(false) | |
ugiConf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, "kerberos") | |
UserGroupInformation.setConfiguration(ugiConf) | |
val accumulo = new MiniAccumuloClusterImpl(accumuloConf); | |
flushCoreSite(accumuloConf.getConfDir, coreSite) | |
accumulo.start | |
var app1: UserGroupInformation = null | |
var app2: UserGroupInformation = null | |
it should "allow logging in as root" in { | |
UserGroupInformation.loginUserFromKeytab(kdc.getRootUser.getPrincipal, | |
kdc.getRootUser.getKeytab.getAbsolutePath) | |
} | |
it should "allow logging in as accumulo" in { | |
UserGroupInformation.loginUserFromKeytab(kdc.getAccumuloServerUser.getPrincipal, | |
kdc.getAccumuloServerUser.getKeytab.getAbsolutePath) | |
} | |
it should "connect to Accumulo as root" in { | |
UserGroupInformation.loginUserFromKeytab(kdc.getRootUser.getPrincipal, | |
kdc.getRootUser.getKeytab.getAbsolutePath) | |
accumulo.getConnector(kdc.getRootUser.getPrincipal, new KerberosToken) | |
} | |
"The KDC" should "let us create an `app1` user" in { | |
val keytab = new File(kdc.getKeytabDir, "app1.keytab") | |
kdc.createPrincipal(keytab, "app1") | |
app1 = UserGroupInformation.loginUserFromKeytabAndReturnUGI("app1", keytab.getAbsolutePath) | |
} | |
it should "let us create an `app2` user" in { | |
val keytab = new File(kdc.getKeytabDir, "app2.keytab") | |
kdc.createPrincipal(keytab, "app2") | |
app2 = UserGroupInformation.loginUserFromKeytabAndReturnUGI("app2", keytab.getAbsolutePath) | |
} | |
"Accumulo" should "allow us to login as `app1`" in { | |
app1.doAs(new PrivilegedAction[Unit] { | |
override def run: Unit = { | |
val token = new KerberosToken() | |
val login_attempt = Try(accumulo.getConnector(app1.getUserName, token)) | |
login_attempt match { | |
case v: Failure[_] => { | |
fail(v.exception) | |
} | |
case _ => () | |
} | |
} | |
}) | |
} | |
it should "allow us to login as `app2`" in { | |
app2.doAs(new PrivilegedAction[Unit] { | |
override def run: Unit = { | |
val token = new KerberosToken() | |
val login_attempt = Try(accumulo.getConnector(app2.getUserName, token)) | |
login_attempt match { | |
case v: Failure[_] => { | |
fail(v.exception) | |
} | |
case _ => () | |
} | |
} | |
}) | |
} | |
it should "revoke app2's attempt to modify the root user" in { | |
val root = UserGroupInformation.loginUserFromKeytabAndReturnUGI("root@EXAMPLE.COM", kdc.getRootUser.getKeytab.getAbsolutePath) | |
val con = accumulo.getConnector(root.getUserName, new KerberosToken) | |
con.securityOperations.grantSystemPermission(app1.getUserName, SystemPermission.GRANT) | |
doAs(app1, () => { | |
val con = accumulo.getConnector(app1.getUserName, new KerberosToken) | |
con.securityOperations.revokeSystemPermission(root.getUserName, SystemPermission.CREATE_TABLE) | |
}) | |
// this will now throw an error | |
con.tableOperations.create("hopefully_I_can_create_this") | |
} | |
"Root" can "can grant access to users Accumulo knows about" in { | |
val root = UserGroupInformation.loginUserFromKeytabAndReturnUGI("root@EXAMPLE.COM", kdc.getRootUser.getKeytab.getAbsolutePath) | |
doAs(root, () => { | |
val con = accumulo.getConnector(root.getUserName, new KerberosToken) | |
val sops = con.securityOperations() | |
sops.grantSystemPermission(app1.getUserName, SystemPermission.CREATE_TABLE) | |
sops.grantNamespacePermission(app1.getUserName, Namespaces.ACCUMULO_NAMESPACE, NamespacePermission.CREATE_TABLE) | |
}) | |
} | |
def doAs[T](user: UserGroupInformation, fn: () => T): Unit = { | |
user.doAs( new PrivilegedAction[T] { | |
override def run(): T = fn.apply() | |
}) | |
} | |
"App1" should "be able to create a table" in { | |
doAs(app1, () => { | |
val con = accumulo.getConnector(app1.getUserName, new KerberosToken) | |
con.tableOperations().create("app1") | |
assert(con.tableOperations().list().contains("app1")) | |
}) | |
} | |
it can "write to the app1 table" in { | |
doAs(app1, () => { | |
val con = accumulo.getConnector(app1.getUserName, new KerberosToken) | |
val bw = con.createBatchWriter("app1", new BatchWriterConfig) | |
val m = new Mutation("r") | |
m.put("", "", "v") | |
bw.addMutation(m) | |
bw.close() | |
}) | |
} | |
it can "read data from the app1 table" in { | |
doAs(app1, () => { | |
val con = accumulo.getConnector(app1.getUserName, new KerberosToken) | |
val s = con.createScanner("app1", Authorizations.EMPTY) | |
s.setRange(new org.apache.accumulo.core.data.Range()) | |
import scala.collection.JavaConversions.iterableAsScalaIterable | |
s foreach { | |
log.info("{}", _) | |
} | |
}) | |
} | |
// "Accumulo" can "handle a username that's different from the Kerberos principal" in { | |
// doAs(app1, () => { | |
// val con = accumulo.getConnector("app:user", new KerberosToken) | |
// }) | |
// } | |
def configureForKerberos(cfg: MiniAccumuloConfigImpl, coreSite: Configuration, kdc: TestingKdc) { | |
val JAVA_SECURITY_KRB5_CONF = "java.security.krb5.conf" | |
val SUN_SECURITY_KRB5_DEBUG = "sun.security.krb5.debug" | |
val siteConfig: JMap[String, String] = cfg.getSiteConfig | |
if ("true".equalsIgnoreCase(siteConfig.get(Property.INSTANCE_RPC_SSL_ENABLED.getKey))) { | |
throw new RuntimeException("Cannot use both SSL and SASL/Kerberos") | |
} | |
if ("true".equalsIgnoreCase(siteConfig.get(Property.INSTANCE_RPC_SASL_ENABLED.getKey))) { | |
return | |
} | |
if (null == kdc) { | |
throw new IllegalStateException("MiniClusterKdc was null") | |
} | |
cfg.setProperty(Property.INSTANCE_RPC_SASL_ENABLED, "true") | |
val serverUser: ClusterUser = kdc.getAccumuloServerUser | |
cfg.setProperty(Property.GENERAL_KERBEROS_KEYTAB, serverUser.getKeytab.getAbsolutePath) | |
cfg.setProperty(Property.GENERAL_KERBEROS_PRINCIPAL, serverUser.getPrincipal) | |
cfg.setProperty(Property.INSTANCE_SECURITY_AUTHENTICATOR, classOf[KerberosAuthenticator].getName) | |
cfg.setProperty(Property.INSTANCE_SECURITY_AUTHORIZOR, classOf[KerberosAuthorizor].getName) | |
cfg.setProperty(Property.INSTANCE_SECURITY_PERMISSION_HANDLER, classOf[KerberosPermissionHandler].getName) | |
cfg.setProperty(Property.TRACE_USER, serverUser.getPrincipal) | |
cfg.setProperty(Property.TRACE_TOKEN_TYPE, KerberosToken.CLASS_NAME) | |
val systemProperties: JMap[String, String] = cfg.getSystemProperties | |
systemProperties.put(JAVA_SECURITY_KRB5_CONF, System.getProperty(JAVA_SECURITY_KRB5_CONF, "")) | |
systemProperties.put(SUN_SECURITY_KRB5_DEBUG, System.getProperty(SUN_SECURITY_KRB5_DEBUG, "true")) | |
// log.info("LOOKING FOR KRB5: {}", systemProperties.get(JAVA_SECURITY_KRB5_CONF)) | |
cfg.setSystemProperties(systemProperties) | |
coreSite.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, "kerberos") | |
cfg.setRootUserName(kdc.getRootUser.getPrincipal) | |
} | |
def flushCoreSite(dir: File, conf: Configuration) = { | |
val csFile: File = new File(dir, "core-site.xml") | |
if (csFile.exists) throw new RuntimeException(csFile + " already exist") | |
val out: OutputStream = new BufferedOutputStream(new FileOutputStream(csFile)) | |
conf.writeXml(out) | |
out.close | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment