Last active
February 27, 2018 15:50
-
-
Save TanUkkii007/bbd719ea6b665985880a9503a58b50a5 to your computer and use it in GitHub Desktop.
LINE Developer Meetup in Tokyo #28 https://line.connpass.com/event/78912/
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.util | |
import akka.NotUsed | |
import akka.stream.scaladsl.Source | |
import akka.stream.{ActorAttributes, Attributes, Outlet, SourceShape} | |
import akka.stream.stage.{GraphStage, GraphStageLogic, OutHandler, StageLogging} | |
import org.hbase.async.Scanner.ScanMetrics | |
import org.hbase.async.{KeyValue, Scanner} | |
import com.stumbleupon.async.Callback | |
import scala.collection.JavaConverters._ | |
trait HBaseCallbackConversion { | |
protected implicit def functionToCallback[R, T](f: T => R): Callback[R, T] = new Callback[R, T] { | |
def call(arg: T): R = f(arg) | |
} | |
} | |
class HBaseAsyncScanStage(scanner: Scanner) | |
extends GraphStage[SourceShape[util.ArrayList[KeyValue]]] with HBaseCallbackConversion { | |
val out: Outlet[util.ArrayList[KeyValue]] = Outlet("HBaseAsyncScanStage") | |
override def shape: SourceShape[util.ArrayList[KeyValue]] = SourceShape(out) | |
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = | |
new GraphStageLogic(shape) with StageLogging { | |
// These mutable variables can be safely modified inside callbacks obtained with getAsyncCallback. | |
var buffer: List[util.ArrayList[KeyValue]] = List.empty | |
var autoClosed = false | |
var requestingNext = false | |
// This variable is referenced from threads of asynchbase (Netty) so annotate with volatile. | |
@volatile var scheduleClose = false | |
setHandler( | |
out, | |
new OutHandler { | |
override def onPull(): Unit = { | |
if (buffer.isEmpty) { | |
requestingNext = true | |
val deferred = scanner.nextRows() | |
deferred.addCallbacks( | |
(results: util.ArrayList[util.ArrayList[KeyValue]]) => { | |
// Close request was scheduled so close the scanner now. | |
if (scheduleClose) { | |
closeResources() | |
} else { | |
callback.invoke(Option(results)) | |
} | |
}, | |
(e: Throwable) => { | |
// Close request was scheduled so close the scanner now. | |
if (scheduleClose) { | |
closeResources() | |
} else { | |
errorback.invoke(e) | |
} | |
} | |
) | |
} else { | |
val (element, tailBuffer) = (buffer.head, buffer.tail) | |
buffer = tailBuffer | |
push(out, element) | |
} | |
} | |
} | |
) | |
override def postStop(): Unit = { | |
// If downstream finished earlier than this stage, the stream is cancelled and postStop is called. | |
// At that time nextRows may be still ongoing. If so, Close request cannot be fired. | |
// Set scheduleClose=true to notify to close the scanner once nextRows is finished. | |
if (requestingNext) { | |
scheduleClose = true | |
} else { | |
if (!autoClosed) { | |
closeResources() | |
} | |
} | |
super.postStop() | |
} | |
// AsyncCallback can be reused inside GraphStageLogic. | |
private val callback = getAsyncCallback[Option[util.ArrayList[util.ArrayList[KeyValue]]]] { | |
resultOpt => | |
requestingNext = false | |
resultOpt match { | |
case Some(results) if !results.isEmpty => | |
val element = results.remove(0) | |
buffer = results.asScala.toList | |
push(out, element) | |
case Some(results) if results.isEmpty => | |
autoClosed = true | |
complete(out) | |
case None => | |
// asynchbase automatically closes a scanner when the scan is finished normally. | |
autoClosed = true | |
complete(out) | |
} | |
} | |
// AsyncCallback can be reused inside GraphStageLogic. | |
private val errorback = getAsyncCallback[Throwable] { error => | |
requestingNext = false | |
fail(out, error) | |
} | |
private def closeResources(): Unit = { | |
if (!autoClosed) { | |
scanner.close() | |
} | |
} | |
} | |
} | |
object HBaseAsyncScanSource { | |
def apply(scanner: Scanner): Source[util.ArrayList[KeyValue], NotUsed] = { | |
Source | |
.fromGraph(new HBaseAsyncScanStage(scanner)) | |
.withAttributes(Attributes.name("HBaseAsyncScanSource") and ActorAttributes.dispatcher( | |
"hbase-dispatcher") and Attributes.asyncBoundary) | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import akka.NotUsed | |
import akka.stream.scaladsl.Source | |
import akka.stream.{ActorAttributes, Attributes, Outlet, SourceShape} | |
import akka.stream.stage.{GraphStage, GraphStageLogic, OutHandler} | |
import org.apache.hadoop.hbase.TableName | |
import org.apache.hadoop.hbase.client._ | |
class HBaseScanStage(connection: Connection, tableName: TableName, scan: Scan) | |
extends GraphStage[SourceShape[Result]] { | |
val out: Outlet[Result] = Outlet("HBaseScanSource") | |
override def shape: SourceShape[Result] = SourceShape(out) | |
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = | |
new GraphStageLogic(shape) { | |
var table: Table = _ | |
var scanner: ResultScanner = _ | |
override def preStart(): Unit = { | |
table = connection.getTable(tableName) | |
scanner = table.getScanner(scan) | |
} | |
setHandler(out, new OutHandler { | |
override def onPull(): Unit = { | |
val next = scanner.next() | |
if (next == null) | |
complete(out) | |
else | |
push(out, next) | |
} | |
}) | |
override def postStop(): Unit = { | |
if (scanner != null) scanner.close() | |
if (table != null) table.close() | |
super.postStop() | |
} | |
} | |
} | |
object HBaseScanSource { | |
def apply(connection: Connection, | |
tableName: TableName, | |
scan: Scan): Source[Result, NotUsed] = { | |
Source | |
.fromGraph( | |
new HBaseScanStage(connection, tableName, scan)) | |
.withAttributes(Attributes.name("HBaseScanSource") and ActorAttributes.dispatcher( | |
"hbase-blocking-dispatcher") and Attributes.asyncBoundary) | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.apache.hadoop.hbase.TableName | |
import org.apache.hadoop.hbase.client._ | |
import scala.annotation.tailrec | |
def scanHBase(connection: Connection, tableName: TableName, scan: Scan): Vector[Result] = { | |
// scalastyle:off | |
val table: Table = connection.getTable(tableName) | |
val scanner: ResultScanner = table.getScanner(scan) | |
@tailrec | |
def loop(results: Vector[Result]): Vector[Result] = { | |
val result = scanner.next() | |
if (result == null) | |
results | |
else | |
loop(results :+ result) | |
} | |
try { | |
loop(Vector.empty) | |
} finally { | |
table.close() | |
scanner.close() | |
} | |
// scalastyle:on | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import akka.stream.ThrottleMode | |
import HBaseAsyncScanSource | |
import org.hbase.async.{Config, HBaseClient, NotServingRegionException} | |
import scala.concurrent.duration._ | |
val hbaseClient: HBaseClient = new HBaseClient(new Config) | |
def scanner: Scanner = hbaseClient.newScanner(tableName.toBytes) | |
HBaseAsyncScanSource(scanner).take(1000) | |
HBaseAsyncScanSource(scanner) | |
.throttle(elements=100, per=1 second, maximumBurst=100, ThrottleMode.Shaping) | |
HBaseAsyncScanSource(scanner).completionTimeout(5 seconds) | |
HBaseAsyncScanSource(scanner).recoverWithRetries(10, { | |
case NotServingRegionException => HBaseAsyncScanSource(scanner) | |
}) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment