Skip to content

Instantly share code, notes, and snippets.

@secustor
Created November 2, 2022 18:32
Show Gist options
  • Save secustor/65db5ebd0a85449319ad0ea2736db7a7 to your computer and use it in GitHub Desktop.
Save secustor/65db5ebd0a85449319ad0ea2736db7a7 to your computer and use it in GitHub Desktop.
OTEL crashhandler
package frauddetectionservice
import org.apache.kafka.clients.consumer.ConsumerConfig.*
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.kafka.common.serialization.ByteArrayDeserializer
import hipstershop.Demo.*
import io.opentelemetry.api.common.AttributeKey
import io.opentelemetry.api.common.Attributes
import io.opentelemetry.api.trace.Span
import io.opentelemetry.api.trace.StatusCode
import java.time.Duration.ofMillis
import java.util.Properties
import kotlin.random.Random
import kotlin.system.exitProcess
const val topic = "orders"
const val groupID = "frauddetectionservice"
fun main(args: Array<String>) {
val props = Properties()
props[KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java.name
props[VALUE_DESERIALIZER_CLASS_CONFIG] = ByteArrayDeserializer::class.java.name
props[GROUP_ID_CONFIG] = groupID
val bootstrapServers = System.getenv("KAFKA_SERVICE_ADDR")
if (bootstrapServers == null) {
println("KAFKA_SERVICE_ADDR is not supplied")
exitProcess(1)
}
props[BOOTSTRAP_SERVERS_CONFIG] = bootstrapServers
val consumer = KafkaConsumer<String, ByteArray>(props).apply {
subscribe(listOf(topic))
}
var totalCount = 0L
Thread.currentThread().uncaughtExceptionHandler = CrashReportingExceptionHandler()
consumer.use {
while (true) {
totalCount = consumer
.poll(ofMillis(100))
.fold(totalCount) { accumulator, record ->
val newCount = accumulator + 1
val orders = OrderResult.parseFrom(record.value())
if (Random.nextBoolean()) {
throw Exception("This is a test exception")
}
println("Consumed record with key ${record.key()} and value $orders, and updated total count to $newCount")
newCount
}
}
}
}
class CrashReportingExceptionHandler : Thread.UncaughtExceptionHandler {
override fun uncaughtException(t: Thread?, e: Throwable?) {
val span = Span.current()
val currentException = e ?: java.lang.Exception("Unknown")
span.recordException(currentException, Attributes.of(
AttributeKey.stringKey("testKey"), "testValue",
AttributeKey.stringArrayKey("attributeArrayKey"), listOf("testArrayValue1", "testArrayValue2")
)
)
span.setStatus(StatusCode.ERROR)
span.end()
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment