Last active June 26, 2018 13:54
Alpakka HBase connector, write Akka messages to HBase

Alpakka HBase connector

In this example we create an Akka Sink that write message to HBase.

Official documentation:

Add the following dependencies:

  • "com.lightbend.akka" %% "akka-stream-alpakka-hbase" % "0.19"

Let's say that you have a kafka topic sample-msg with messages composed by a key String and a value String, and you want to write this to an HBase table called messages using as the row key the message key and as a column description in family data the message value. In a real world scenario you will probably want to deserialize the message value in some more complex object and map it in one or more columns.

The main App.scala file:

import akka.kafka.{ConsumerSettings, Subscriptions}
import akka.kafka.scaladsl.Consumer
import{ActorMaterializer, Materializer}
import org.apache.hadoop.hbase.{HBaseConfiguration, TableName}
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.util.Bytes
import org.apache.kafka.common.serialization.{IntegerDeserializer, StringDeserializer}

import scala.collection.immutable.Seq
import scala.concurrent.ExecutionContextExecutor
import scala.util.{Failure, Success}

case class SampleMsg(id: String, description: String)

object App {
  def main(args: Array[String]): Unit = {
    println("Hello from hBaseWriter")

    implicit val system: ActorSystem = ActorSystem("hBaseWriter")
    implicit val materializer: Materializer = ActorMaterializer()

    // For now assume string, string inside kafka message
    val consumerSettings =
      ConsumerSettings[String, String](system, new StringDeserializer, new StringDeserializer)

    // table is automatically created the first time
    val tableSettings =
      HTableSettings(HBaseConfiguration.create(), TableName.valueOf("messages"), Seq("data"), hBaseConverter)

    val done = Consumer
      .atMostOnceSource(consumerSettings, Subscriptions.topics("sample-msg"))
      .map(r => SampleMsg(r.key(), r.value()))
      // .map(m => { println(s"Importing $m"); m }) // just for debug!

    implicit val ec: ExecutionContextExecutor = system.dispatcher
    done onComplete  {
      case Success(_) => println("Done"); system.terminate()
      case Failure(err) => println(err.toString); system.terminate()

  def hBaseConverter(msg: SampleMsg): Put = {
    import scala.language.implicitConversions
    implicit def toBytes(value: String): Array[Byte] = Bytes.toBytes(value)

    val put = new Put(
    put.addColumn("data", "description", msg.description)

A sample application.conf file:

akka.kafka.consumer {
  kafka-clients {
    bootstrap.servers = "quickstart.cloudera:9092" = "test-group1"

    auto.offset.reset = "earliest"

IMPORTANT: If you want to implement at-least-once delivery you should use Consumer.committableSource and then manually commit the offset. You can also use a pass through graph (here an example) to split the offset from the message.

See also: akka/alpakka-kafka#525

