Skip to content

Instantly share code, notes, and snippets.

@lukestephenson-zendesk
Last active June 18, 2020 00:22
Show Gist options
  • Save lukestephenson-zendesk/0b0b147a3a1c44f4e78e02ecdbd14a95 to your computer and use it in GitHub Desktop.
Save lukestephenson-zendesk/0b0b147a3a1c44f4e78e02ecdbd14a95 to your computer and use it in GitHub Desktop.
Efficient Kafka Consumers workshop

Background / prep

Hello. This is a follow up workshop to the Efficient Kafka Consumers presentation. If you haven't seen that presentation, this workshop may be difficult to follow. You can find the recording on google drive

ZDI Setup / Verification

If you haven't you will need to install zdi.

Make sure zdi kafka is running.

In one terminal window run:

zdi kafka consume-topic kafkabus.heartbeat

In another terminal window run:

zdi kafka create-message kafkabus.heartbeat
hello

Your message should show in the consume-topic window.

Java / Scala setup

If you don't usually develop Scala applications, you will need to install Java / SBT.

brew tap AdoptOpenJDK/openjdk
brew cask install adoptopenjdk11
brew install sbt

Generating a stub consumer

These steps may take a while if you haven't done much Scala development before as lots of dependencies are pulled from the internet.

Follow the instructions on https://github.com/zendesk/zendesk-scala-api-template.g8/blob/master/README.md to generate a project with a Kafka consumer.

Run the project with sbt run (full instructions in template project).

In the terminal window you are using to publish messages, publish another message again with zdi kafka create-message kafkabus.heartbeat. You should now be able to see that message displayed from your consumer.

The following we will go through in the workshop

Getting familiar with the consumer

Open your project in intellij (community edition is fine).

Have a look at:

  • MyKafkaConsumer - the kafka consumer you can start to tweak
  • build.sbt - to see what dependencies this project has

Extending the consumer

If you are new to Scala, don't worry if you don't get this far.

Before you start, commit the template generated code so if you make a mistake you can easily revert:

git init
git add .
git commit -m "Project generated from template"

These imports may help with completing the exercises:

import scala.concurrent.duration._
import monix.execution.Scheduler.Implicits.global

Saving to an external state store

Copy ElasticSearch.scala into your project. Instead of just printing the values in MyKafkaConsumer, try saving them to the very fake ElasticSearch.insertOrUpdate. You will need to make use of the mapAsync operation. Start with a parallelism of 1.

Run your updated consumer. Now publish a lot of messages to the topic and see how the consumer performs (it logs every 1000 messages inserted).

zdi kafka shell
cd /opt/kafka/bin
root@ebf69e8485f6:/opt/kafka/bin# ./kafka-producer-perf-test.sh --topic kafkabus.heartbeat --throughput 10000 --record-size 10 --num-records 10000 --producer-props bootstrap.servers=kafka.docker:9092

This will output something like:

10000 records sent, 9746.588694 records/sec (0.09 MB/sec), 6.25 ms avg latency, 298.00 ms max latency, 5 ms 50th, 15 ms 95th, 35 ms 99th, 44 ms 99.9th.

Now look at how long your consumer took. You will see some log statements like:

{"@timestamp":"2020-06-16T23:35:31.147Z","@version":"1","message":"ES index contains 1000 documents","logger_name":"com.zendesk.zts.search.ElasticSearch$","thread_name":"scala-execution-context-global-18","level":"INFO","level_value":20000,"zendesk_pod":"unknown-pod"}
{"@timestamp":"2020-06-16T23:35:33.727Z","@version":"1","message":"ES index contains 2000 documents","logger_name":"com.zendesk.zts.search.ElasticSearch$","thread_name":"scala-execution-context-global-18","level":"INFO","level_value":20000,"zendesk_pod":"unknown-pod"}

Note there is a couple of seconds between those log statements.

Consider committing before continuing.

Batching

Elasticsearch is more efficient with batches. https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html

Performs multiple indexing or delete operations in a single API call. This reduces overhead and can greatly increase indexing speed.

Switch to using ElasticSearch.bulkInsertOrUpdate with batches of 50 records groupedWithin 5.milliseconds.

Rerun the consumer and do another bulk publish of messages. Compare the performance to before using the log output.

package com.zendesk.zts.search
import com.typesafe.scalalogging.StrictLogging
import monix.execution.Scheduler.Implicits.global
import monix.eval.Task
import monix.execution.atomic.AtomicInt
import scala.concurrent.duration._
import scala.concurrent.Future
/**
* Simulates each insert takes 2 milliseconds (network latency etc). Batches take 5 ms (same network latency and slightly more processing to do).
*/
object ElasticSearch extends StrictLogging {
private val docsInserted = AtomicInt(0)
def insertOrUpdate(document: String): Future[Unit] = {
simulateDelay(2.milliseconds)(List(document))
}
def bulkInsertOrUpdate(documents: Seq[String]): Future[Unit] = {
simulateDelay(5.milliseconds)(documents)
}
private def simulateDelay(duration: FiniteDuration)(documents: Seq[String]): Future[Unit] = {
Task {
// nothing - just pretending
()
}
.delayExecution(duration)
.flatMap { _ =>
val indexSize = docsInserted.incrementAndGet(documents.length)
if (indexSize % 1000 == 0) {
Task(logger.info(s"ES index contains $indexSize documents"))
} else {
Task.now(())
}
}
.runToFuture
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment