Skip to content

Instantly share code, notes, and snippets.

@TanUkkii007
Last active February 27, 2018 15:50
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save TanUkkii007/bbd719ea6b665985880a9503a58b50a5 to your computer and use it in GitHub Desktop.
Save TanUkkii007/bbd719ea6b665985880a9503a58b50a5 to your computer and use it in GitHub Desktop.
LINE Developer Meetup in Tokyo #28 https://line.connpass.com/event/78912/
import java.util
import akka.NotUsed
import akka.stream.scaladsl.Source
import akka.stream.{ActorAttributes, Attributes, Outlet, SourceShape}
import akka.stream.stage.{GraphStage, GraphStageLogic, OutHandler, StageLogging}
import org.hbase.async.Scanner.ScanMetrics
import org.hbase.async.{KeyValue, Scanner}
import com.stumbleupon.async.Callback
import scala.collection.JavaConverters._
trait HBaseCallbackConversion {
protected implicit def functionToCallback[R, T](f: T => R): Callback[R, T] = new Callback[R, T] {
def call(arg: T): R = f(arg)
}
}
class HBaseAsyncScanStage(scanner: Scanner)
extends GraphStage[SourceShape[util.ArrayList[KeyValue]]] with HBaseCallbackConversion {
val out: Outlet[util.ArrayList[KeyValue]] = Outlet("HBaseAsyncScanStage")
override def shape: SourceShape[util.ArrayList[KeyValue]] = SourceShape(out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) with StageLogging {
// These mutable variables can be safely modified inside callbacks obtained with getAsyncCallback.
var buffer: List[util.ArrayList[KeyValue]] = List.empty
var autoClosed = false
var requestingNext = false
// This variable is referenced from threads of asynchbase (Netty) so annotate with volatile.
@volatile var scheduleClose = false
setHandler(
out,
new OutHandler {
override def onPull(): Unit = {
if (buffer.isEmpty) {
requestingNext = true
val deferred = scanner.nextRows()
deferred.addCallbacks(
(results: util.ArrayList[util.ArrayList[KeyValue]]) => {
// Close request was scheduled so close the scanner now.
if (scheduleClose) {
closeResources()
} else {
callback.invoke(Option(results))
}
},
(e: Throwable) => {
// Close request was scheduled so close the scanner now.
if (scheduleClose) {
closeResources()
} else {
errorback.invoke(e)
}
}
)
} else {
val (element, tailBuffer) = (buffer.head, buffer.tail)
buffer = tailBuffer
push(out, element)
}
}
}
)
override def postStop(): Unit = {
// If downstream finished earlier than this stage, the stream is cancelled and postStop is called.
// At that time nextRows may be still ongoing. If so, Close request cannot be fired.
// Set scheduleClose=true to notify to close the scanner once nextRows is finished.
if (requestingNext) {
scheduleClose = true
} else {
if (!autoClosed) {
closeResources()
}
}
super.postStop()
}
// AsyncCallback can be reused inside GraphStageLogic.
private val callback = getAsyncCallback[Option[util.ArrayList[util.ArrayList[KeyValue]]]] {
resultOpt =>
requestingNext = false
resultOpt match {
case Some(results) if !results.isEmpty =>
val element = results.remove(0)
buffer = results.asScala.toList
push(out, element)
case Some(results) if results.isEmpty =>
autoClosed = true
complete(out)
case None =>
// asynchbase automatically closes a scanner when the scan is finished normally.
autoClosed = true
complete(out)
}
}
// AsyncCallback can be reused inside GraphStageLogic.
private val errorback = getAsyncCallback[Throwable] { error =>
requestingNext = false
fail(out, error)
}
private def closeResources(): Unit = {
if (!autoClosed) {
scanner.close()
}
}
}
}
object HBaseAsyncScanSource {
def apply(scanner: Scanner): Source[util.ArrayList[KeyValue], NotUsed] = {
Source
.fromGraph(new HBaseAsyncScanStage(scanner))
.withAttributes(Attributes.name("HBaseAsyncScanSource") and ActorAttributes.dispatcher(
"hbase-dispatcher") and Attributes.asyncBoundary)
}
}
import akka.NotUsed
import akka.stream.scaladsl.Source
import akka.stream.{ActorAttributes, Attributes, Outlet, SourceShape}
import akka.stream.stage.{GraphStage, GraphStageLogic, OutHandler}
import org.apache.hadoop.hbase.TableName
import org.apache.hadoop.hbase.client._
class HBaseScanStage(connection: Connection, tableName: TableName, scan: Scan)
extends GraphStage[SourceShape[Result]] {
val out: Outlet[Result] = Outlet("HBaseScanSource")
override def shape: SourceShape[Result] = SourceShape(out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) {
var table: Table = _
var scanner: ResultScanner = _
override def preStart(): Unit = {
table = connection.getTable(tableName)
scanner = table.getScanner(scan)
}
setHandler(out, new OutHandler {
override def onPull(): Unit = {
val next = scanner.next()
if (next == null)
complete(out)
else
push(out, next)
}
})
override def postStop(): Unit = {
if (scanner != null) scanner.close()
if (table != null) table.close()
super.postStop()
}
}
}
object HBaseScanSource {
def apply(connection: Connection,
tableName: TableName,
scan: Scan): Source[Result, NotUsed] = {
Source
.fromGraph(
new HBaseScanStage(connection, tableName, scan))
.withAttributes(Attributes.name("HBaseScanSource") and ActorAttributes.dispatcher(
"hbase-blocking-dispatcher") and Attributes.asyncBoundary)
}
}
import org.apache.hadoop.hbase.TableName
import org.apache.hadoop.hbase.client._
import scala.annotation.tailrec
def scanHBase(connection: Connection, tableName: TableName, scan: Scan): Vector[Result] = {
// scalastyle:off
val table: Table = connection.getTable(tableName)
val scanner: ResultScanner = table.getScanner(scan)
@tailrec
def loop(results: Vector[Result]): Vector[Result] = {
val result = scanner.next()
if (result == null)
results
else
loop(results :+ result)
}
try {
loop(Vector.empty)
} finally {
table.close()
scanner.close()
}
// scalastyle:on
}
import akka.stream.ThrottleMode
import HBaseAsyncScanSource
import org.hbase.async.{Config, HBaseClient, NotServingRegionException}
import scala.concurrent.duration._
val hbaseClient: HBaseClient = new HBaseClient(new Config)
def scanner: Scanner = hbaseClient.newScanner(tableName.toBytes)
HBaseAsyncScanSource(scanner).take(1000)
HBaseAsyncScanSource(scanner)
.throttle(elements=100, per=1 second, maximumBurst=100, ThrottleMode.Shaping)
HBaseAsyncScanSource(scanner).completionTimeout(5 seconds)
HBaseAsyncScanSource(scanner).recoverWithRetries(10, {
case NotServingRegionException => HBaseAsyncScanSource(scanner)
})
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment