Skip to content

Instantly share code, notes, and snippets.

@n1ko-w1ll
Created November 20, 2019 14:23
Show Gist options
  • Save n1ko-w1ll/0a7e0f47a89bfe875341bd383859a9f5 to your computer and use it in GitHub Desktop.
Save n1ko-w1ll/0a7e0f47a89bfe875341bd383859a9f5 to your computer and use it in GitHub Desktop.
Akka streams snippet on how to read several messages from Kafka and write the content to InfluxDB
val committerSettings = CommitterSettings.create(system)
val innerControl = new AtomicReference[Consumer.Control](Consumer.NoopControl)
val stream = RestartSource.withBackoff(
minBackoff = 3.seconds,
maxBackoff = 30.seconds,
randomFactor = 0.2,
) { () =>
Consumer
.committableSource(consumerSettings, Subscriptions.topics(topic))
// create a tuple for each incoming message with the content and the committable
.mapAsync(1) { msg =>
Future.successful(MessageJsonDeserializer.parseMessage(msg.record.value()), msg.committableOffset)
}
// group them so that we can write more values at once later
.groupedWithin(batchSize, batchDuration)
// now we get a group of tuples...
.mapAsync(concurrency) { tuples =>
// ...which we split into two parts
val (messages, offsets) = tuples.unzip
// the values are flattened and written to InfluxDB...
database.bulkWrite(messages.flatMap(messageToPoints), precision = Precision.SECONDS)
// ...while we push the offsets further as batch
.map(_ => CommittableOffsetBatch(offsets))
}
// side effect out the `Control` materialized value because it can't be propagated through the `RestartSource`
.mapMaterializedValue(c => innerControl.set(c))
.via(Committer.flow(committerSettings))
}
stream.runWith(Sink.ignore)
// Add shutdown hook to respond to SIGTERM and gracefully shutdown stream
sys.ShutdownHookThread {
Await.result(innerControl.get.shutdown(), 10.seconds)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment