Hello. This is a follow up workshop to the Efficient Kafka Consumers presentation. If you haven't seen that presentation, this workshop may be difficult to follow. You can find the recording on google drive
If you haven't you will need to install zdi.
Make sure zdi kafka
is running.
In one terminal window run:
zdi kafka consume-topic kafkabus.heartbeat
In another terminal window run:
zdi kafka create-message kafkabus.heartbeat
hello
Your message should show in the consume-topic
window.
If you don't usually develop Scala applications, you will need to install Java / SBT.
brew tap AdoptOpenJDK/openjdk
brew cask install adoptopenjdk11
brew install sbt
These steps may take a while if you haven't done much Scala development before as lots of dependencies are pulled from the internet.
Follow the instructions on https://github.com/zendesk/zendesk-scala-api-template.g8/blob/master/README.md to generate a project with a Kafka consumer.
Run the project with sbt run
(full instructions in template project).
In the terminal window you are using to publish messages, publish another message again with zdi kafka create-message kafkabus.heartbeat
. You should now
be able to see that message displayed from your consumer.
Open your project in intellij (community edition is fine).
Have a look at:
MyKafkaConsumer
- the kafka consumer you can start to tweakbuild.sbt
- to see what dependencies this project has
If you are new to Scala, don't worry if you don't get this far.
Before you start, commit the template generated code so if you make a mistake you can easily revert:
git init
git add .
git commit -m "Project generated from template"
These imports may help with completing the exercises:
import scala.concurrent.duration._
import monix.execution.Scheduler.Implicits.global
Copy ElasticSearch.scala
into your project. Instead of just printing the values in MyKafkaConsumer
, try saving them
to the very fake ElasticSearch.insertOrUpdate
. You will need to make use of the mapAsync operation.
Start with a parallelism of 1.
Run your updated consumer. Now publish a lot of messages to the topic and see how the consumer performs (it logs every 1000 messages inserted).
zdi kafka shell
cd /opt/kafka/bin
root@ebf69e8485f6:/opt/kafka/bin# ./kafka-producer-perf-test.sh --topic kafkabus.heartbeat --throughput 10000 --record-size 10 --num-records 10000 --producer-props bootstrap.servers=kafka.docker:9092
This will output something like:
10000 records sent, 9746.588694 records/sec (0.09 MB/sec), 6.25 ms avg latency, 298.00 ms max latency, 5 ms 50th, 15 ms 95th, 35 ms 99th, 44 ms 99.9th.
Now look at how long your consumer took. You will see some log statements like:
{"@timestamp":"2020-06-16T23:35:31.147Z","@version":"1","message":"ES index contains 1000 documents","logger_name":"com.zendesk.zts.search.ElasticSearch$","thread_name":"scala-execution-context-global-18","level":"INFO","level_value":20000,"zendesk_pod":"unknown-pod"}
{"@timestamp":"2020-06-16T23:35:33.727Z","@version":"1","message":"ES index contains 2000 documents","logger_name":"com.zendesk.zts.search.ElasticSearch$","thread_name":"scala-execution-context-global-18","level":"INFO","level_value":20000,"zendesk_pod":"unknown-pod"}
Note there is a couple of seconds between those log statements.
Consider committing before continuing.
Elasticsearch is more efficient with batches. https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html
Performs multiple indexing or delete operations in a single API call. This reduces overhead and can greatly increase indexing speed.
Switch to using ElasticSearch.bulkInsertOrUpdate
with batches of 50 records groupedWithin
5.milliseconds
.
Rerun the consumer and do another bulk publish of messages. Compare the performance to before using the log output.