Created
November 2, 2022 18:32
-
-
Save secustor/65db5ebd0a85449319ad0ea2736db7a7 to your computer and use it in GitHub Desktop.
OTEL crashhandler
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package frauddetectionservice | |
import org.apache.kafka.clients.consumer.ConsumerConfig.* | |
import org.apache.kafka.clients.consumer.KafkaConsumer | |
import org.apache.kafka.common.serialization.StringDeserializer | |
import org.apache.kafka.common.serialization.ByteArrayDeserializer | |
import hipstershop.Demo.* | |
import io.opentelemetry.api.common.AttributeKey | |
import io.opentelemetry.api.common.Attributes | |
import io.opentelemetry.api.trace.Span | |
import io.opentelemetry.api.trace.StatusCode | |
import java.time.Duration.ofMillis | |
import java.util.Properties | |
import kotlin.random.Random | |
import kotlin.system.exitProcess | |
const val topic = "orders" | |
const val groupID = "frauddetectionservice" | |
fun main(args: Array<String>) { | |
val props = Properties() | |
props[KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java.name | |
props[VALUE_DESERIALIZER_CLASS_CONFIG] = ByteArrayDeserializer::class.java.name | |
props[GROUP_ID_CONFIG] = groupID | |
val bootstrapServers = System.getenv("KAFKA_SERVICE_ADDR") | |
if (bootstrapServers == null) { | |
println("KAFKA_SERVICE_ADDR is not supplied") | |
exitProcess(1) | |
} | |
props[BOOTSTRAP_SERVERS_CONFIG] = bootstrapServers | |
val consumer = KafkaConsumer<String, ByteArray>(props).apply { | |
subscribe(listOf(topic)) | |
} | |
var totalCount = 0L | |
Thread.currentThread().uncaughtExceptionHandler = CrashReportingExceptionHandler() | |
consumer.use { | |
while (true) { | |
totalCount = consumer | |
.poll(ofMillis(100)) | |
.fold(totalCount) { accumulator, record -> | |
val newCount = accumulator + 1 | |
val orders = OrderResult.parseFrom(record.value()) | |
if (Random.nextBoolean()) { | |
throw Exception("This is a test exception") | |
} | |
println("Consumed record with key ${record.key()} and value $orders, and updated total count to $newCount") | |
newCount | |
} | |
} | |
} | |
} | |
class CrashReportingExceptionHandler : Thread.UncaughtExceptionHandler { | |
override fun uncaughtException(t: Thread?, e: Throwable?) { | |
val span = Span.current() | |
val currentException = e ?: java.lang.Exception("Unknown") | |
span.recordException(currentException, Attributes.of( | |
AttributeKey.stringKey("testKey"), "testValue", | |
AttributeKey.stringArrayKey("attributeArrayKey"), listOf("testArrayValue1", "testArrayValue2") | |
) | |
) | |
span.setStatus(StatusCode.ERROR) | |
span.end() | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment