In this example we create an Akka Sink that write message to HBase.
Official documentation: https://developer.lightbend.com/docs/alpakka/current/hbase.html
Add the following dependencies:
"com.lightbend.akka" %% "akka-stream-alpakka-hbase" % "0.19"
Let's say that you have a kafka topic sample-msg
with messages composed by a key String
and a value String
,
and you want to write this to an HBase table called messages
using as the row key the message key
and as a column description
in family data
the message value.
In a real world scenario you will probably want to deserialize the message value in some more complex object
and map it in one or more columns.
The main App.scala
file:
import akka.actor.ActorSystem
import akka.kafka.{ConsumerSettings, Subscriptions}
import akka.kafka.scaladsl.Consumer
import akka.stream.alpakka.hbase.HTableSettings
import akka.stream.alpakka.hbase.scaladsl.HTableStage
import akka.stream.{ActorMaterializer, Materializer}
import org.apache.hadoop.hbase.{HBaseConfiguration, TableName}
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.util.Bytes
import org.apache.kafka.common.serialization.{IntegerDeserializer, StringDeserializer}
import scala.collection.immutable.Seq
import scala.concurrent.ExecutionContextExecutor
import scala.util.{Failure, Success}
case class SampleMsg(id: String, description: String)
object App {
def main(args: Array[String]): Unit = {
println("Hello from hBaseWriter")
implicit val system: ActorSystem = ActorSystem("hBaseWriter")
implicit val materializer: Materializer = ActorMaterializer()
// For now assume string, string inside kafka message
val consumerSettings =
ConsumerSettings[String, String](system, new StringDeserializer, new StringDeserializer)
// table is automatically created the first time
val tableSettings =
HTableSettings(HBaseConfiguration.create(), TableName.valueOf("messages"), Seq("data"), hBaseConverter)
val done = Consumer
.atMostOnceSource(consumerSettings, Subscriptions.topics("sample-msg"))
.map(r => SampleMsg(r.key(), r.value()))
// .map(m => { println(s"Importing $m"); m }) // just for debug!
.runWith(HTableStage.sink[SampleMsg](tableSettings))
implicit val ec: ExecutionContextExecutor = system.dispatcher
done onComplete {
case Success(_) => println("Done"); system.terminate()
case Failure(err) => println(err.toString); system.terminate()
}
}
def hBaseConverter(msg: SampleMsg): Put = {
import scala.language.implicitConversions
implicit def toBytes(value: String): Array[Byte] = Bytes.toBytes(value)
val put = new Put(msg.id)
put.addColumn("data", "description", msg.description)
put
}
}
A sample application.conf
file:
akka.kafka.consumer {
kafka-clients {
bootstrap.servers = "quickstart.cloudera:9092"
group.id = "test-group1"
# https://kafka.apache.org/documentation/#newconsumerconfigs
auto.offset.reset = "earliest"
}
}
IMPORTANT: If you want to implement at-least-once
delivery you should use Consumer.committableSource
and then manually commit the offset. You can also use a pass through graph (here an example) to split the offset from the message.
See also: akka/alpakka-kafka#525