Skip to content

Instantly share code, notes, and snippets.

@juhoautio
Created January 24, 2019 10:00
Show Gist options
  • Save juhoautio/326acff2c34cd45a32af0a375257ba22 to your computer and use it in GitHub Desktop.
Save juhoautio/326acff2c34cd45a32af0a375257ba22 to your computer and use it in GitHub Desktop.
package com.quantifind.kafka.reporter.influxdb
import java.util.concurrent.TimeUnit
import com.quantifind.kafka.OffsetGetter
import com.quantifind.kafka.offsetapp.OffsetInfoReporter
import com.quantifind.kafka.reporter.influxdb.InfluxDBOffsetInfoReporter.pluginArgsToMap
import org.influxdb.dto.{BatchPoints, Point}
import org.influxdb.{InfluxDB, InfluxDBFactory}
class InfluxDBOffsetInfoReporter(pluginsArgs: String) extends OffsetInfoReporter {
val argsMap: Map[String, String] = pluginArgsToMap(pluginsArgs)
val database = argsMap("influxdbDatabase")
val measurement = argsMap("infludxbMeasurement")
val influxDB: InfluxDB = InfluxDBFactory.connect(s"http://${argsMap("influxdbHost")}:${argsMap("influxdbPort")}",
argsMap("influxdbUser"), argsMap("influxdbPassword"))
influxDB.createDatabase(database)
override def report(offsetInfoSeq: IndexedSeq[OffsetGetter.OffsetInfo]) = {
// important: use the same timestamp for all partitions
// truncate time to the minute, which is the interval used in grafana
val minuteNow = TimeUnit.MILLISECONDS.toMinutes(System.currentTimeMillis);
val builder = BatchPoints.database(database)
offsetInfoSeq.foreach(offsetInfo => {
val point: Point = Point.measurement(measurement)
.time(minuteNow, TimeUnit.MINUTES)
.tag("group", offsetInfo.group)
.tag("topic", offsetInfo.topic)
.tag("partition", offsetInfo.partition.toString)
.addField("logSize", offsetInfo.logSize)
.addField("offset", offsetInfo.offset)
.addField("lag", offsetInfo.lag)
// last commit timestamp. no reasonable way to display in grafana, but add it for debugging any way.
.addField("modified", offsetInfo.modified.inMillis)
.build()
builder.point(point)
})
influxDB.write(builder.build)
}
}
object InfluxDBOffsetInfoReporter {
def pluginArgsToMap(pluginsArgs: String): Map[String, String] = {
pluginsArgs
.split(",")
.map(_.split("=", 2))
.map(split => {
split(0) -> split(1)
})
.toMap
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment