Skip to content

Instantly share code, notes, and snippets.

@rweeks
Created October 11, 2016 02:10
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save rweeks/aaf110bb73837fe0ec6675d495bc8338 to your computer and use it in GitHub Desktop.
Save rweeks/aaf110bb73837fe0ec6675d495bc8338 to your computer and use it in GitHub Desktop.
Not-quite-pseudocode-not-quite-code showing how to go from a Transpose table to a Data table in Spark
package variantspark
import collection.JavaConverters._
import com.google.common.primitives.UnsignedBytes
import org.apache.accumulo.core.client.{IteratorSetting, ZooKeeperInstance}
import org.apache.accumulo.core.client.mapred.AccumuloInputFormat
import org.apache.accumulo.core.client.mapreduce.impl.DelegationTokenStub
import org.apache.accumulo.core.client.mapreduce.lib.impl.ConfiguratorBase._
import org.apache.accumulo.core.client.mapreduce.lib.impl.InputConfigurator
import org.apache.accumulo.core.data.{Key, Value, Range => ARange}
import org.apache.hadoop.conf.{Configuration => HadoopConfiguration}
import org.apache.hadoop.io.Text
import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.security.{Credentials, UserGroupInformation}
import org.apache.spark.{HashPartitioner, SerializableWritable, SparkContext}
import org.apache.spark.rdd.RDD
class VariantDereferencer {
/**
* This function is meant to be passed to mapPartitions, where the partition that we're iterating over consists of
* key/value pairs from the Variant transpose table. The input row id references are mapped to rows from the Variant
* table, encoded using the WholeRowIterator.
*/
def dereferenceVariantIds(batchSize: Int, numDereferenceThreads: Int,
wConf: SerializableWritable[HadoopConfiguration],
wCreds: SerializableWritable[Credentials])
(eIdPart: Iterator[(Array[Byte], Key)]) : Iterator[(Key,Value)] = {
val (conf, creds) = (wConf.value, wCreds.value)
val principal = getPrincipal(classOf[AccumuloInputFormat], conf)
val delTokenStub = getAuthenticationToken(classOf[AccumuloInputFormat], conf)
UserGroupInformation.getCurrentUser.addCredentials(creds)
val clientConfig = getClientConfiguration(classOf[AccumuloInputFormat], conf)
val scanAuths = InputConfigurator.getScanAuthorizations(classOf[AccumuloInputFormat], conf)
val delToken = unwrapAuthenticationToken(Job.getInstance(conf), delTokenStub)
val conn = new ZooKeeperInstance(clientConfig).getConnector(principal, delToken)
val scanner = conn.createBatchScanner(VariantTable, scanAuths, numDereferenceThreads)
scanner.addScanIterator(new IteratorSetting(
40, "WholeRowIterator", classOf[org.apache.accumulo.core.iterators.user.WholeRowIterator])
)
val kvIterIter = for (keyGroup <- eIdPart.grouped(batchSize)) yield {
scanner.setRanges(keyGroup.map(ek => ARange.exact(new Text(ek._1))).asJava)
for (kvPair <- scanner.asScala) yield (kvPair.getKey, kvPair.getValue)
}
kvIterIter.flatten
}
/**
* This is meant to be called from a Spark driver. It applies a query to the Variant transpose table and dereferences
* the results to the Variant table.
*/
def findEntities(sc: SparkContext,
query: String,
batchSize: Int = 1000,
numDereferenceThreads: Int = 2) : RDD[(Key,Value)] = {
val job = jobBuilder.prepareJob()
.forTable(VariantTransposeTable)
.forRanges(new AttrQueryParser().newRangeFromQuery(query).toList)
val jc = job.create().getConfiguration
val creds = job.create().getCredentials
implicit val arrayOrdering = UnsignedBytes.lexicographicalComparator()
val entityIds = job.createRDD(sc)
.map(x => (extractVariantId(x._1), x._1))
.repartitionAndSortWithinPartitions(new HashPartitioner(sc.defaultParallelism))
.mapPartitions(dereferenceEntityIds(
tenantId, batchSize, numDereferenceThreads,
new SerializableWritable(jc),
new SerializableWritable(creds)
))
entityIds
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment