Created
November 20, 2019 14:23
-
-
Save n1ko-w1ll/0a7e0f47a89bfe875341bd383859a9f5 to your computer and use it in GitHub Desktop.
Akka streams snippet on how to read several messages from Kafka and write the content to InfluxDB
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
val committerSettings = CommitterSettings.create(system) | |
val innerControl = new AtomicReference[Consumer.Control](Consumer.NoopControl) | |
val stream = RestartSource.withBackoff( | |
minBackoff = 3.seconds, | |
maxBackoff = 30.seconds, | |
randomFactor = 0.2, | |
) { () => | |
Consumer | |
.committableSource(consumerSettings, Subscriptions.topics(topic)) | |
// create a tuple for each incoming message with the content and the committable | |
.mapAsync(1) { msg => | |
Future.successful(MessageJsonDeserializer.parseMessage(msg.record.value()), msg.committableOffset) | |
} | |
// group them so that we can write more values at once later | |
.groupedWithin(batchSize, batchDuration) | |
// now we get a group of tuples... | |
.mapAsync(concurrency) { tuples => | |
// ...which we split into two parts | |
val (messages, offsets) = tuples.unzip | |
// the values are flattened and written to InfluxDB... | |
database.bulkWrite(messages.flatMap(messageToPoints), precision = Precision.SECONDS) | |
// ...while we push the offsets further as batch | |
.map(_ => CommittableOffsetBatch(offsets)) | |
} | |
// side effect out the `Control` materialized value because it can't be propagated through the `RestartSource` | |
.mapMaterializedValue(c => innerControl.set(c)) | |
.via(Committer.flow(committerSettings)) | |
} | |
stream.runWith(Sink.ignore) | |
// Add shutdown hook to respond to SIGTERM and gracefully shutdown stream | |
sys.ShutdownHookThread { | |
Await.result(innerControl.get.shutdown(), 10.seconds) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment