Skip to content

Instantly share code, notes, and snippets.

@rweeks
Created July 15, 2016 20:49
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save rweeks/bedb96e20fcb8d2d2f27c555494181c5 to your computer and use it in GitHub Desktop.
Save rweeks/bedb96e20fcb8d2d2f27c555494181c5 to your computer and use it in GitHub Desktop.
Delete all key-value pairs for a single locality group in Accumulo.
// this code is in the public domain
package com.newbrightidea.accumulo
import java.io.IOException
import java.util
import org.apache.accumulo.core.data.{ByteSequence, Key, Range, Value}
import org.apache.accumulo.core.iterators.{IteratorEnvironment, SortedKeyValueIterator, WrappingIterator}
import scala.collection.JavaConverters._
import java.nio.charset.StandardCharsets.UTF_8
import org.apache.accumulo.core.client.security.tokens.KerberosToken
import org.apache.accumulo.core.client.{ClientConfiguration, IteratorSetting, ZooKeeperInstance}
object LocalityGroupDeleter {
val OptLgColFams = "lgColFams"
def main(args: Array[String]): Integer = {
val Array(tableName, lgCols) = args
val krbToken = new KerberosToken()
val accConn = new ZooKeeperInstance(ClientConfiguration.loadDefault())
.getConnector(krbToken.getPrincipal, krbToken)
accConn.tableOperations().compact( tableName, null, null,
List(
new IteratorSetting(50, "LocalityGroupDeleter", classOf[LocalityGroupDeleter], Map(OptLgColFams -> lgCols).asJava)
).asJava, true, true)
0
}
}
class LocalityGroupDeleter extends WrappingIterator {
import LocalityGroupDeleter._
var lgColFams = Set[String]()
var seekCfMatchesLg = false
override def init(source: SortedKeyValueIterator[Key, Value], options: util.Map[String, String],
env: IteratorEnvironment): Unit = {
super.init(source, options, env)
if (!options.containsKey(OptLgColFams)) {
throw new IOException("Specify LG column families as comma-separated list using " + OptLgColFams)
}
lgColFams = Set(options.get(OptLgColFams).split(","):_*)
}
override def seek(range: Range, columnFamilies: util.Collection[ByteSequence], inclusive: Boolean): Unit = {
super.seek(range, columnFamilies, inclusive)
val seekColFams = columnFamilies.asScala.map(x => new String(x.toArray, UTF_8)).toSet
seekCfMatchesLg = seekColFams == lgColFams
}
override def hasTop: Boolean = if (seekCfMatchesLg) false else super.hasTop
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment