Skip to content

Instantly share code, notes, and snippets.

@wjsl
Created March 9, 2016 00:33
Show Gist options
  • Save wjsl/93c8528e8f27bbeb31bf to your computer and use it in GitHub Desktop.
Save wjsl/93c8528e8f27bbeb31bf to your computer and use it in GitHub Desktop.
import java.io.{FileOutputStream, BufferedOutputStream, OutputStream, File}
import java.security.PrivilegedAction
import javax.security.auth.Subject
import com.google.common.io.Files
import org.apache.accumulo.cluster.ClusterUser
import org.apache.accumulo.core.client.impl.Namespaces
import org.apache.accumulo.core.client.{BatchWriterConfig, ClientConfiguration}
import org.apache.accumulo.core.client.security.tokens.KerberosToken
import org.apache.accumulo.core.conf.Property
import org.apache.accumulo.core.data.Mutation
import org.apache.accumulo.core.security.{NamespacePermission, Authorizations, SystemPermission}
import org.apache.accumulo.minicluster.impl.{MiniAccumuloClusterImpl, MiniAccumuloConfigImpl}
import org.apache.accumulo.minicluster.{MiniAccumuloCluster, MiniAccumuloConfig}
import org.apache.accumulo.server.security.handler.{KerberosAuthenticator, KerberosAuthorizor, KerberosPermissionHandler}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.CommonConfigurationKeysPublic
import org.apache.hadoop.security.UserGroupInformation
import org.apache.log4j.{Level, Logger}
import org.slf4j.LoggerFactory
import java.util.{Map => JMap}
import scala.collection.SortedSet
import scala.reflect.macros.Context
import scala.util.{Failure, Try}
/**
* Created by bill on 1/11/16.
*/
import org.scalatest._
class KerbSpec extends FlatSpec {
Logger.getLogger("org.apache.zookeeper").setLevel(Level.OFF)
Logger.getLogger("org.apache.directory").setLevel(Level.OFF)
Logger.getLogger(classOf[ClientConfiguration]).setLevel(Level.OFF)
val log = LoggerFactory.getLogger(getClass)
val work_dir = Files.createTempDir()
log.info("Working dir: {}", work_dir)
val kdc = TestingKdc()
kdc.start
val root = kdc.getRootUser
val accumuloConf = new MiniAccumuloConfigImpl(work_dir, "secret")
accumuloConf.setNumTservers(1)
val coreSite = new Configuration(false)
configureForKerberos(accumuloConf, coreSite, kdc)
val ugiConf = new Configuration(false)
ugiConf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, "kerberos")
UserGroupInformation.setConfiguration(ugiConf)
val accumulo = new MiniAccumuloClusterImpl(accumuloConf);
flushCoreSite(accumuloConf.getConfDir, coreSite)
accumulo.start
var app1: UserGroupInformation = null
var app2: UserGroupInformation = null
it should "allow logging in as root" in {
UserGroupInformation.loginUserFromKeytab(kdc.getRootUser.getPrincipal,
kdc.getRootUser.getKeytab.getAbsolutePath)
}
it should "allow logging in as accumulo" in {
UserGroupInformation.loginUserFromKeytab(kdc.getAccumuloServerUser.getPrincipal,
kdc.getAccumuloServerUser.getKeytab.getAbsolutePath)
}
it should "connect to Accumulo as root" in {
UserGroupInformation.loginUserFromKeytab(kdc.getRootUser.getPrincipal,
kdc.getRootUser.getKeytab.getAbsolutePath)
accumulo.getConnector(kdc.getRootUser.getPrincipal, new KerberosToken)
}
"The KDC" should "let us create an `app1` user" in {
val keytab = new File(kdc.getKeytabDir, "app1.keytab")
kdc.createPrincipal(keytab, "app1")
app1 = UserGroupInformation.loginUserFromKeytabAndReturnUGI("app1", keytab.getAbsolutePath)
}
it should "let us create an `app2` user" in {
val keytab = new File(kdc.getKeytabDir, "app2.keytab")
kdc.createPrincipal(keytab, "app2")
app2 = UserGroupInformation.loginUserFromKeytabAndReturnUGI("app2", keytab.getAbsolutePath)
}
"Accumulo" should "allow us to login as `app1`" in {
app1.doAs(new PrivilegedAction[Unit] {
override def run: Unit = {
val token = new KerberosToken()
val login_attempt = Try(accumulo.getConnector(app1.getUserName, token))
login_attempt match {
case v: Failure[_] => {
fail(v.exception)
}
case _ => ()
}
}
})
}
it should "allow us to login as `app2`" in {
app2.doAs(new PrivilegedAction[Unit] {
override def run: Unit = {
val token = new KerberosToken()
val login_attempt = Try(accumulo.getConnector(app2.getUserName, token))
login_attempt match {
case v: Failure[_] => {
fail(v.exception)
}
case _ => ()
}
}
})
}
it should "revoke app2's attempt to modify the root user" in {
val root = UserGroupInformation.loginUserFromKeytabAndReturnUGI("root@EXAMPLE.COM", kdc.getRootUser.getKeytab.getAbsolutePath)
val con = accumulo.getConnector(root.getUserName, new KerberosToken)
con.securityOperations.grantSystemPermission(app1.getUserName, SystemPermission.GRANT)
doAs(app1, () => {
val con = accumulo.getConnector(app1.getUserName, new KerberosToken)
con.securityOperations.revokeSystemPermission(root.getUserName, SystemPermission.CREATE_TABLE)
})
// this will now throw an error
con.tableOperations.create("hopefully_I_can_create_this")
}
"Root" can "can grant access to users Accumulo knows about" in {
val root = UserGroupInformation.loginUserFromKeytabAndReturnUGI("root@EXAMPLE.COM", kdc.getRootUser.getKeytab.getAbsolutePath)
doAs(root, () => {
val con = accumulo.getConnector(root.getUserName, new KerberosToken)
val sops = con.securityOperations()
sops.grantSystemPermission(app1.getUserName, SystemPermission.CREATE_TABLE)
sops.grantNamespacePermission(app1.getUserName, Namespaces.ACCUMULO_NAMESPACE, NamespacePermission.CREATE_TABLE)
})
}
def doAs[T](user: UserGroupInformation, fn: () => T): Unit = {
user.doAs( new PrivilegedAction[T] {
override def run(): T = fn.apply()
})
}
"App1" should "be able to create a table" in {
doAs(app1, () => {
val con = accumulo.getConnector(app1.getUserName, new KerberosToken)
con.tableOperations().create("app1")
assert(con.tableOperations().list().contains("app1"))
})
}
it can "write to the app1 table" in {
doAs(app1, () => {
val con = accumulo.getConnector(app1.getUserName, new KerberosToken)
val bw = con.createBatchWriter("app1", new BatchWriterConfig)
val m = new Mutation("r")
m.put("", "", "v")
bw.addMutation(m)
bw.close()
})
}
it can "read data from the app1 table" in {
doAs(app1, () => {
val con = accumulo.getConnector(app1.getUserName, new KerberosToken)
val s = con.createScanner("app1", Authorizations.EMPTY)
s.setRange(new org.apache.accumulo.core.data.Range())
import scala.collection.JavaConversions.iterableAsScalaIterable
s foreach {
log.info("{}", _)
}
})
}
// "Accumulo" can "handle a username that's different from the Kerberos principal" in {
// doAs(app1, () => {
// val con = accumulo.getConnector("app:user", new KerberosToken)
// })
// }
def configureForKerberos(cfg: MiniAccumuloConfigImpl, coreSite: Configuration, kdc: TestingKdc) {
val JAVA_SECURITY_KRB5_CONF = "java.security.krb5.conf"
val SUN_SECURITY_KRB5_DEBUG = "sun.security.krb5.debug"
val siteConfig: JMap[String, String] = cfg.getSiteConfig
if ("true".equalsIgnoreCase(siteConfig.get(Property.INSTANCE_RPC_SSL_ENABLED.getKey))) {
throw new RuntimeException("Cannot use both SSL and SASL/Kerberos")
}
if ("true".equalsIgnoreCase(siteConfig.get(Property.INSTANCE_RPC_SASL_ENABLED.getKey))) {
return
}
if (null == kdc) {
throw new IllegalStateException("MiniClusterKdc was null")
}
cfg.setProperty(Property.INSTANCE_RPC_SASL_ENABLED, "true")
val serverUser: ClusterUser = kdc.getAccumuloServerUser
cfg.setProperty(Property.GENERAL_KERBEROS_KEYTAB, serverUser.getKeytab.getAbsolutePath)
cfg.setProperty(Property.GENERAL_KERBEROS_PRINCIPAL, serverUser.getPrincipal)
cfg.setProperty(Property.INSTANCE_SECURITY_AUTHENTICATOR, classOf[KerberosAuthenticator].getName)
cfg.setProperty(Property.INSTANCE_SECURITY_AUTHORIZOR, classOf[KerberosAuthorizor].getName)
cfg.setProperty(Property.INSTANCE_SECURITY_PERMISSION_HANDLER, classOf[KerberosPermissionHandler].getName)
cfg.setProperty(Property.TRACE_USER, serverUser.getPrincipal)
cfg.setProperty(Property.TRACE_TOKEN_TYPE, KerberosToken.CLASS_NAME)
val systemProperties: JMap[String, String] = cfg.getSystemProperties
systemProperties.put(JAVA_SECURITY_KRB5_CONF, System.getProperty(JAVA_SECURITY_KRB5_CONF, ""))
systemProperties.put(SUN_SECURITY_KRB5_DEBUG, System.getProperty(SUN_SECURITY_KRB5_DEBUG, "true"))
// log.info("LOOKING FOR KRB5: {}", systemProperties.get(JAVA_SECURITY_KRB5_CONF))
cfg.setSystemProperties(systemProperties)
coreSite.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, "kerberos")
cfg.setRootUserName(kdc.getRootUser.getPrincipal)
}
def flushCoreSite(dir: File, conf: Configuration) = {
val csFile: File = new File(dir, "core-site.xml")
if (csFile.exists) throw new RuntimeException(csFile + " already exist")
val out: OutputStream = new BufferedOutputStream(new FileOutputStream(csFile))
conf.writeXml(out)
out.close
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment