Skip to content

Instantly share code, notes, and snippets.

@sskrla
Last active March 19, 2021 03:27
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save sskrla/f9b646581132ff5b9532adc017a5375c to your computer and use it in GitHub Desktop.
Save sskrla/f9b646581132ff5b9532adc017a5375c to your computer and use it in GitHub Desktop.
Micronaut Kafka Streams Metrics
import io.micronaut.configuration.kafka.streams.ConfiguredStreamBuilder
import io.micronaut.configuration.kafka.streams.event.AfterKafkaStreamsStart
import io.micronaut.configuration.kafka.streams.event.BeforeKafkaStreamStart
import io.micronaut.context.annotation.Context
import io.micronaut.context.annotation.EachBean
import io.micronaut.context.annotation.Factory
import io.micronaut.context.annotation.Replaces
import io.micronaut.context.event.ApplicationEventPublisher
import org.apache.kafka.streams.KafkaStreams
import org.apache.kafka.streams.kstream.KStream
import java.io.Closeable
import java.time.Duration
import java.util.*
import javax.annotation.PreDestroy
import javax.inject.Inject
@Factory
class KafkaStreamsFactoryMetricsOverride @Inject constructor(private val eventPublisher: ApplicationEventPublisher) : Closeable {
private val streams = Collections.synchronizedList(mutableListOf<KafkaStreams>())
@Replaces(KafkaStreams::class)
@EachBean(ConfiguredStreamBuilder::class)
@Context
fun kafkaStreams(
clientSupplier: MicronautKafkaClientSupplier,
builder: ConfiguredStreamBuilder,
vararg kStreams: KStream<*, *>?,
): KafkaStreams {
val kafkaStreams = KafkaStreams(
builder.build(builder.configuration),
builder.configuration,
clientSupplier
)
eventPublisher.publishEvent(BeforeKafkaStreamStart(kafkaStreams, kStreams))
streams.add(kafkaStreams)
kafkaStreams.start()
eventPublisher.publishEvent(AfterKafkaStreamsStart(kafkaStreams, kStreams))
return kafkaStreams
}
@PreDestroy
override fun close() {
for(stream in streams) {
stream.close(Duration.ofSeconds(3))
}
}
}
import io.micronaut.configuration.kafka.config.*
import io.micronaut.context.BeanContext
import io.micronaut.context.annotation.Prototype
import io.micronaut.inject.qualifiers.Qualifiers
import org.apache.kafka.clients.admin.Admin
import org.apache.kafka.clients.admin.AdminClient
import org.apache.kafka.clients.consumer.Consumer
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.producer.Producer
import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.common.serialization.ByteArrayDeserializer
import org.apache.kafka.common.serialization.ByteArraySerializer
import org.apache.kafka.streams.KafkaClientSupplier
import java.util.*
@Prototype
class MicronautKafkaClientSupplier(private val context: BeanContext) : KafkaClientSupplier {
override fun getProducer(config: MutableMap<String, Any>) =
context.createBean(
KafkaProducer::class.java,
resolveConfig<AbstractKafkaProducerConfiguration<ByteArray, ByteArray>>("kafka-streams-producer") {
it +
mapOf(
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG to ByteArraySerializer::class.java,
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG to ByteArraySerializer::class.java
) +
config.filterValues { it != null }
}) as Producer<ByteArray, ByteArray>
private fun buildConsumer(name: String, config: Map<String, Any>) =
context.createBean(
KafkaConsumer::class.java,
resolveConfig<AbstractKafkaConsumerConfiguration<ByteArray, ByteArray>>("kafka-streams-producer") {
it +
mapOf(
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to ByteArrayDeserializer::class.java,
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to ByteArrayDeserializer::class.java
) +
config.filterValues { it != null }
}) as KafkaConsumer<ByteArray, ByteArray>
override fun getConsumer(config: Map<String, Any>): Consumer<ByteArray, ByteArray> = buildConsumer("kafka-streams-consumer", config)
override fun getRestoreConsumer(config: Map<String, Any>): Consumer<ByteArray, ByteArray> = buildConsumer("kafka-streams-restore-consumer", config)
override fun getGlobalConsumer(config: Map<String, Any>): Consumer<ByteArray, ByteArray> = buildConsumer("kafka-streams-global-consumer", config)
override fun getAdmin(config: Map<String, Any>): Admin = AdminClient.create(config)
private inline fun <reified T : AbstractKafkaConfiguration<*, *>> resolveConfig(name: String, update: (Map<Any, Any>) -> Map<Any, Any>) =
context.findBean(T::class.java, Qualifiers.byName(name))
.or { context.findBean(T::class.java) }
.orElseThrow()
.also {
val original = it.config.clone() as Properties
it.config.clear()
it.config.putAll(update(original))
}
}
import io.micrometer.core.instrument.MeterRegistry
import io.micrometer.core.instrument.Tag
import io.micrometer.core.instrument.binder.MeterBinder
import io.micronaut.configuration.kafka.metrics.builder.KafkaMetricMeterTypeBuilder
import org.apache.kafka.common.MetricName
import org.apache.kafka.common.metrics.KafkaMetric
import org.apache.kafka.common.metrics.MetricsReporter
import java.util.concurrent.ConcurrentLinkedQueue
import java.util.function.Function
internal val REGISTRIES = ConcurrentLinkedQueue<MeterRegistry>()
private val ALLOWED_TAGS = setOf("client.id")
class StreamsKafkaMetricsReporter : MetricsReporter, MeterBinder {
override fun configure(configs: MutableMap<String, *>?) = Unit
override fun close() = Unit
override fun init(metrics: MutableList<KafkaMetric>) {
for(metric in metrics) {
for(registry in REGISTRIES) {
registerMetric(registry, metric)
}
}
}
override fun metricChange(metric: KafkaMetric) {
for(registry in REGISTRIES) {
registerMetric(registry, metric)
}
}
override fun metricRemoval(metric: KafkaMetric?) = Unit
private fun registerMetric(meterRegistry: MeterRegistry, metric: KafkaMetric) {
KafkaMetricMeterTypeBuilder.newBuilder()
.prefix("kafka.streams")
.metric(metric)
.tagFunction(getTagFunction())
.registry(meterRegistry)
.build()
}
private fun getTagFunction(): Function<MetricName, List<Tag>> = Function { metricName: MetricName ->
metricName
.tags()
.filter { (_, key) -> ALLOWED_TAGS.contains(key) }
.map { (k, v) -> Tag.of(k, v) }
.toList()
}
override fun bindTo(registry: MeterRegistry) {
if(!REGISTRIES.contains(registry)) {
REGISTRIES.add(registry)
}
}
}
import io.micrometer.core.instrument.MeterRegistry
import io.micronaut.configuration.kafka.streams.AbstractKafkaStreamsConfiguration
import io.micronaut.configuration.metrics.annotation.RequiresMetrics
import io.micronaut.context.BeanLocator
import io.micronaut.context.annotation.Context
import io.micronaut.context.event.BeanCreatedEvent
import io.micronaut.context.event.BeanCreatedEventListener
import org.apache.kafka.streams.StreamsConfig.METRIC_REPORTER_CLASSES_CONFIG
import javax.annotation.PreDestroy
import javax.inject.Inject
@RequiresMetrics
@Context
class StreamsMetrics @Inject constructor(
private val locator: BeanLocator
) : BeanCreatedEventListener<AbstractKafkaStreamsConfiguration<*, *>>, AutoCloseable {
override fun onCreated(event: BeanCreatedEvent<AbstractKafkaStreamsConfiguration<*, *>>): AbstractKafkaStreamsConfiguration<*, *> {
locator.findBean(MeterRegistry::class.java).ifPresent { REGISTRIES += it }
event.bean.config[METRIC_REPORTER_CLASSES_CONFIG] = StreamsKafkaMetricsReporter::class.qualifiedName
return event.bean
}
@PreDestroy
override fun close() {
REGISTRIES.clear()
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment