Skip to content

Instantly share code, notes, and snippets.

@joroKr21
Created February 8, 2015 19:31
Show Gist options
  • Save joroKr21/3f6ff46ed1cc1a77361b to your computer and use it in GitHub Desktop.
Save joroKr21/3f6ff46ed1cc1a77361b to your computer and use it in GitHub Desktop.
HBase OutputFormat for Apache Flink
import org.apache.flink.api.common.io.OutputFormat
import org.apache.flink.configuration.Configuration
import org.apache.hadoop.hbase._
import client._
import util.Bytes
import language.{ implicitConversions, reflectiveCalls }
import java.math.BigDecimal
import java.nio.ByteBuffer
abstract class HBaseOutputFormat[T](tableNames: String*)
(implicit config: Map[String, String])
extends OutputFormat[T] {
var tables = Map.empty[String, HTable]
def configure(parameters: Configuration) = ()
def open(taskNumber: Int, numTasks: Int) = {
val hConf = HBaseConfiguration.create
for { (key, value) <- config if key.startsWith("hbase") }
hConf.set(key, value)
for { name <- tableNames } tables += name -> new HTable(hConf, name)
}
def close() = try withResources(tables.values.toSeq: _*) {
_.foreach { _.flushCommits() }
} finally tables = Map.empty
def withResource [R <: { def close(): Unit }](resource: R )
(f: R => Unit) =
try f(resource) finally resource.close()
def withResources[R <: { def close(): Unit }](resources: R*)
(f: Seq[R] => Unit) =
resources.foldRight(f) { (r, f) =>
_ => withResource(r) { _ => f(resources) }
} (resources)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment