Skip to content

Instantly share code, notes, and snippets.

@hguerrero
Last active November 25, 2019 21:02
Show Gist options
  • Save hguerrero/0cdfb5ee9dfb05707ebd82c15af73f5a to your computer and use it in GitHub Desktop.
Save hguerrero/0cdfb5ee9dfb05707ebd82c15af73f5a to your computer and use it in GitHub Desktop.
quarkus-registry-example
# Configuration file
kafka.bootstrap.servers=localhost:9092
mp.messaging.outgoing.price.connector=smallrye-kafka
mp.messaging.outgoing.price.client.id=price-producer
mp.messaging.outgoing.price.topic=prices
mp.messaging.outgoing.price.key.serializer=org.apache.kafka.common.serialization.StringSerializer
mp.messaging.outgoing.price.value.serializer=io.apicurio.registry.utils.serde.AvroKafkaSerializer
mp.messaging.outgoing.price.apicurio.registry.url=http://localhost:8080
mp.messaging.outgoing.price.apicurio.registry.artifact-id=io.apicurio.registry.utils.serde.strategy.TopicIdStrategy
{
"type": "record",
"name": "price",
"namespace": "com.redhat",
"fields": [
{
"name": "symbol",
"type": "string"
},
{
"name": "price",
"type": "string"
}
]
}
package com.redhat;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import javax.enterprise.context.ApplicationScoped;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import io.reactivex.Flowable;
import io.smallrye.reactive.messaging.kafka.KafkaMessage;
@ApplicationScoped
public class RegistryProducer {
private Random random = new Random();
private String[] symbols = new String[]{ "RHT", "IBM", "MSFT", "AMZN" };
@Outgoing("price")
public Flowable<KafkaMessage<String, String>> generate() {
return Flowable.interval(1000, TimeUnit.MILLISECONDS)
.onBackpressureBuffer()
.map(tick -> {
return KafkaMessage.of(
symbols[random.nextInt(4)],
String.format("%.2f", random.nextDouble()));
});
}
}
2019-11-25 15:57:32,558 ERROR [io.sma.rea.mes.kaf.KafkaSink] (vert.x-worker-thread-0) Message io.smallrye.reactive.messaging.kafka.SendingKafkaMessage@270dd072 was not sent to Kafka topic 'prices': javax.ws.rs.WebApplicationException: Unknown error, status code 404
at org.jboss.resteasy.microprofile.client.DefaultResponseExceptionMapper.toThrowable(DefaultResponseExceptionMapper.java:31)
at org.jboss.resteasy.microprofile.client.ExceptionMapping$HandlerException.mapException(ExceptionMapping.java:53)
at org.jboss.resteasy.microprofile.client.ProxyInvocationHandler.invoke(ProxyInvocationHandler.java:166)
at com.sun.proxy.$Proxy40.getArtifactMetaDataByContent(Unknown Source)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at io.apicurio.registry.client.RegistryClient$ServiceProxy.invoke(RegistryClient.java:119)
at com.sun.proxy.$Proxy29.getArtifactMetaDataByContent(Unknown Source)
at io.apicurio.registry.utils.serde.strategy.FindBySchemaIdStrategy.findId(FindBySchemaIdStrategy.java:29)
at io.apicurio.registry.utils.serde.AbstractKafkaSerializer.serialize(AbstractKafkaSerializer.java:146)
at org.apache.kafka.common.serialization.Serializer.serialize(Serializer.java:60)
at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:884)
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:846)
at io.vertx.kafka.client.producer.impl.KafkaWriteStreamImpl.lambda$send$4(KafkaWriteStreamImpl.java:93)
at io.vertx.core.impl.ContextImpl.lambda$executeBlocking$2(ContextImpl.java:316)
at io.vertx.core.impl.TaskQueue.run(TaskQueue.java:76)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(Thread.java:748)
2019-11-25 15:57:32,560 ERROR [io.sma.rea.mes.kaf.KafkaSink] (vert.x-worker-thread-0) Unable to dispatch message to Kafka: java.util.concurrent.CompletionException: javax.ws.rs.WebApplicationException: Unknown error, status code 404
at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:943)
at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
at io.smallrye.reactive.messaging.kafka.KafkaSink.lambda$null$1(KafkaSink.java:119)
at io.vertx.kafka.client.producer.impl.KafkaWriteStreamImpl.lambda$write$5(KafkaWriteStreamImpl.java:143)
at io.vertx.kafka.client.producer.impl.KafkaWriteStreamImpl.lambda$send$4(KafkaWriteStreamImpl.java:131)
at io.vertx.core.impl.ContextImpl.lambda$executeBlocking$2(ContextImpl.java:316)
at io.vertx.core.impl.TaskQueue.run(TaskQueue.java:76)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(Thread.java:748)
Caused by: javax.ws.rs.WebApplicationException: Unknown error, status code 404
at org.jboss.resteasy.microprofile.client.DefaultResponseExceptionMapper.toThrowable(DefaultResponseExceptionMapper.java:31)
at org.jboss.resteasy.microprofile.client.ExceptionMapping$HandlerException.mapException(ExceptionMapping.java:53)
at org.jboss.resteasy.microprofile.client.ProxyInvocationHandler.invoke(ProxyInvocationHandler.java:166)
at com.sun.proxy.$Proxy40.getArtifactMetaDataByContent(Unknown Source)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at io.apicurio.registry.client.RegistryClient$ServiceProxy.invoke(RegistryClient.java:119)
at com.sun.proxy.$Proxy29.getArtifactMetaDataByContent(Unknown Source)
at io.apicurio.registry.utils.serde.strategy.FindBySchemaIdStrategy.findId(FindBySchemaIdStrategy.java:29)
at io.apicurio.registry.utils.serde.AbstractKafkaSerializer.serialize(AbstractKafkaSerializer.java:146)
at org.apache.kafka.common.serialization.Serializer.serialize(Serializer.java:60)
at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:884)
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:846)
at io.vertx.kafka.client.producer.impl.KafkaWriteStreamImpl.lambda$send$4(KafkaWriteStreamImpl.java:93)
... 6 more
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment