Skip to content

Instantly share code, notes, and snippets.

@lucasrpb
Forked from davideicardi/README.md
Created January 31, 2020 20:05
Show Gist options
  • Save lucasrpb/a357f19d5b7b498a2310e93d2862caa8 to your computer and use it in GitHub Desktop.
Save lucasrpb/a357f19d5b7b498a2310e93d2862caa8 to your computer and use it in GitHub Desktop.
Alpakka Kafka connector (akka-stream-kafka) example. Produce and consumer kafka messages using Akka Stream.

Alpakka Kafka connector (akka-stream-kafka) example

Simple solution to use Alpakka Kafka connector to produce and consume kafka messages.

I assume that you have 2 scala apps, a producer and a consumer.

Producer

Add the following dependencies:

  • "com.typesafe.akka" %% "akka-stream-kafka" % "0.21.1"

Create an application.conf:

# Properties for akka.kafka.ProducerSettings can be
# defined in this section or a configuration section with
# the same layout.
akka.kafka.producer {
  # Tuning parameter of how many sends that can run in parallel.
  # parallelism = 100

  # How long to wait for `KafkaProducer.close`
  # close-timeout = 60s

  # Fully qualified config path which holds the dispatcher configuration
  # to be used by the producer stages. Some blocking may occur.
  # When this value is empty, the dispatcher configured for the stream
  # will be used.
  # use-dispatcher = "akka.kafka.default-dispatcher"

  # The time interval to commit a transaction when using the `Transactional.sink` or `Transactional.flow`
  # eos-commit-interval = 100ms

  # Properties defined by org.apache.kafka.clients.producer.ProducerConfig
  # can be defined in this configuration section.
  kafka-clients {
    bootstrap.servers = "quickstart.cloudera:9092"
  }
}

Create a producer.scala:

package fakeProducer

import akka.Done
import akka.actor.ActorSystem
import akka.kafka.ProducerSettings
import akka.kafka.scaladsl.Producer
import akka.stream.{ActorMaterializer, Materializer}
import akka.stream.scaladsl.Source
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.StringSerializer

import scala.concurrent.{ExecutionContextExecutor, Future}
import scala.util.{Failure, Success}

object App {
  def main(args: Array[String]): Unit = {
    println("Hello from producer")

    implicit val system: ActorSystem = ActorSystem("producer-sample")
    implicit val materializer: Materializer = ActorMaterializer()

    val producerSettings =
      ProducerSettings(system, new StringSerializer, new StringSerializer)

    val done: Future[Done] =
      Source(1 to 100)
        .map(value => new ProducerRecord[String, String]("test-topic", "msg " + value))
        .runWith(Producer.plainSink(producerSettings))

    implicit val ec: ExecutionContextExecutor = system.dispatcher
    done onComplete  {
      case Success(_) => println("Done"); system.terminate()
      case Failure(err) => println(err.toString); system.terminate()
    }
  }
}

Consumer

Add the following dependencies:

  • "com.typesafe.akka" %% "akka-stream-kafka" % "0.21.1"

Create an application.conf file:

# Properties for akka.kafka.ConsumerSettings can be
# defined in this section or a configuration section with
# the same layout.
akka.kafka.consumer {
  # Tuning property of scheduled polls.
  # poll-interval = 50ms

  # Tuning property of the `KafkaConsumer.poll` parameter.
  # Note that non-zero value means that the thread that
  # is executing the stage will be blocked.
  # poll-timeout = 50ms

  # The stage will await outstanding offset commit requests before
  # shutting down, but if that takes longer than this timeout it will
  # stop forcefully.
  # stop-timeout = 30s

  # How long to wait for `KafkaConsumer.close`
  # close-timeout = 20s

  # If offset commit requests are not completed within this timeout
  # the returned Future is completed `CommitTimeoutException`.
  # commit-timeout = 15s

  # If commits take longer than this time a warning is logged
  # commit-time-warning = 1s

  # If for any reason `KafkaConsumer.poll` blocks for longer than the configured
  # poll-timeout then it is forcefully woken up with `KafkaConsumer.wakeup`.
  # The KafkaConsumerActor will throw
  # `org.apache.kafka.common.errors.WakeupException` which will be ignored
  # until `max-wakeups` limit gets exceeded.
  # wakeup-timeout = 60s

  # After exceeding maxinum wakeups the consumer will stop and the stage and fail.
  # Setting it to 0 will let it ignore the wakeups and try to get the polling done forever.
  # max-wakeups = 10

  # If set to a finite duration, the consumer will re-send the last committed offsets periodically
  # for all assigned partitions. See https://issues.apache.org/jira/browse/KAFKA-4682.
  # commit-refresh-interval = infinite

  # If enabled, log stack traces before waking up the KafkaConsumer to give
  # some indication why the KafkaConsumer is not honouring the `poll-timeout`
  #wakeup-debug = true

  # Fully qualified config path which holds the dispatcher configuration
  # to be used by the KafkaConsumerActor. Some blocking may occur.
  # use-dispatcher = "akka.kafka.default-dispatcher"

  # Properties defined by org.apache.kafka.clients.consumer.ConsumerConfig
  # can be defined in this configuration section.
  kafka-clients {
    # auto-commit disabled by default
    # Setting enable.auto.commit means that offsets are committed automatically 
    #  with a frequency controlled by the config auto.commit.interval.ms.
    enable.auto.commit = true

    bootstrap.servers = "quickstart.cloudera:9092"
    group.id = "test-group1"

    auto.offset.reset = "earliest"
  }

  # Time to wait for pending requests when a partition is closed
  # wait-close-partition = 500ms
}

Create a consumer.scala file:

package fakeConsumer

import akka.actor.ActorSystem
import akka.kafka.{ConsumerSettings, Subscriptions}
import akka.kafka.scaladsl.Consumer
import akka.stream.scaladsl.Sink
import akka.stream.{ActorMaterializer, Materializer}
import org.apache.kafka.common.serialization.StringDeserializer

import scala.concurrent.ExecutionContextExecutor
import scala.util.{Failure, Success}

object App {
  def main(args: Array[String]): Unit = {
    println("Hello from hBaseWriter")

    implicit val system: ActorSystem = ActorSystem("consumer-sample")
    implicit val materializer: Materializer = ActorMaterializer()

    val consumerSettings =
      ConsumerSettings(system, new StringDeserializer, new StringDeserializer)

    val done = Consumer
      .plainSource(consumerSettings, Subscriptions.topics("test-topic"))
      .runWith(Sink.foreach(println)) // just print each message for debugging

    implicit val ec: ExecutionContextExecutor = system.dispatcher
    done onComplete  {
      case Success(_) => println("Done"); system.terminate()
      case Failure(err) => println(err.toString); system.terminate()
    }
  }
}

To correctly handle Ctrl+C or if your app is hosted inside a container and you want to gracefully shutdown you can use the following code:

// retrieve the control object
val control = val done = Consumer
  .plainSource(consumerSettings, Subscriptions.topics("test-topic"))
  .toMat(Sink.foreach(println))(Keep.both)
  .mapMaterializedValue(DrainingControl.apply)
  .run()

// Correctly handle Ctrl+C and docker container stop
sys.addShutdownHook({
  println("Shutdown requested...")

  val done = control.shutdown()

  implicit val ec: ExecutionContextExecutor = system.dispatcher
  done
    .onComplete {
      case Success(_) => logger.info("Exiting ...")
      case Failure(err) => logger.error("Error", err)
    }
})

See also

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment