Skip to content

Instantly share code, notes, and snippets.

@davideicardi
Last active June 26, 2018 13:54
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save davideicardi/fdbe7ca1febb26469a0db93903ba487f to your computer and use it in GitHub Desktop.
Save davideicardi/fdbe7ca1febb26469a0db93903ba487f to your computer and use it in GitHub Desktop.
Alpakka HBase connector, write Akka messages to HBase

Alpakka HBase connector

In this example we create an Akka Sink that write message to HBase.

Official documentation: https://developer.lightbend.com/docs/alpakka/current/hbase.html

Add the following dependencies:

  • "com.lightbend.akka" %% "akka-stream-alpakka-hbase" % "0.19"

Let's say that you have a kafka topic sample-msg with messages composed by a key String and a value String, and you want to write this to an HBase table called messages using as the row key the message key and as a column description in family data the message value. In a real world scenario you will probably want to deserialize the message value in some more complex object and map it in one or more columns.

The main App.scala file:

import akka.actor.ActorSystem
import akka.kafka.{ConsumerSettings, Subscriptions}
import akka.kafka.scaladsl.Consumer
import akka.stream.alpakka.hbase.HTableSettings
import akka.stream.alpakka.hbase.scaladsl.HTableStage
import akka.stream.{ActorMaterializer, Materializer}
import org.apache.hadoop.hbase.{HBaseConfiguration, TableName}
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.util.Bytes
import org.apache.kafka.common.serialization.{IntegerDeserializer, StringDeserializer}

import scala.collection.immutable.Seq
import scala.concurrent.ExecutionContextExecutor
import scala.util.{Failure, Success}

case class SampleMsg(id: String, description: String)

object App {
  def main(args: Array[String]): Unit = {
    println("Hello from hBaseWriter")

    implicit val system: ActorSystem = ActorSystem("hBaseWriter")
    implicit val materializer: Materializer = ActorMaterializer()

    // For now assume string, string inside kafka message
    val consumerSettings =
      ConsumerSettings[String, String](system, new StringDeserializer, new StringDeserializer)

    // table is automatically created the first time
    val tableSettings =
      HTableSettings(HBaseConfiguration.create(), TableName.valueOf("messages"), Seq("data"), hBaseConverter)

    val done = Consumer
      .atMostOnceSource(consumerSettings, Subscriptions.topics("sample-msg"))
      .map(r => SampleMsg(r.key(), r.value()))
      // .map(m => { println(s"Importing $m"); m }) // just for debug!
      .runWith(HTableStage.sink[SampleMsg](tableSettings))

    implicit val ec: ExecutionContextExecutor = system.dispatcher
    done onComplete  {
      case Success(_) => println("Done"); system.terminate()
      case Failure(err) => println(err.toString); system.terminate()
    }
  }

  def hBaseConverter(msg: SampleMsg): Put = {
    import scala.language.implicitConversions
    implicit def toBytes(value: String): Array[Byte] = Bytes.toBytes(value)

    val put = new Put(msg.id)
    put.addColumn("data", "description", msg.description)
    put
  }
}

A sample application.conf file:

akka.kafka.consumer {
  kafka-clients {
    bootstrap.servers = "quickstart.cloudera:9092"
    group.id = "test-group1"

    # https://kafka.apache.org/documentation/#newconsumerconfigs
    auto.offset.reset = "earliest"
  }
}

IMPORTANT: If you want to implement at-least-once delivery you should use Consumer.committableSource and then manually commit the offset. You can also use a pass through graph (here an example) to split the offset from the message.

See also: akka/alpakka-kafka#525

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment