Last active
August 4, 2021 08:27
-
-
Save ShubhamRwt/6936b7a60c1c0b458efcf2b1309cb948 to your computer and use it in GitHub Desktop.
With EKC
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@Test | |
void sendSimpleMessagesPartition(VertxTestContext context) throws InterruptedException { | |
String topic = "sendSimpleMessageToPartition"; | |
KAFKA_FACADE.createTopic(topic, 2, 1); | |
ProtonClient client = ProtonClient.create(vertx); | |
Checkpoint consume = context.checkpoint(); | |
client.connect(AmqpBridgeIT.BRIDGE_HOST, AmqpBridgeIT.BRIDGE_PORT, ar -> { | |
if (ar.succeeded()) { | |
ProtonConnection connection = ar.result(); | |
connection.open(); | |
ProtonSender sender = connection.createSender(null); | |
sender.open(); | |
String body = "Simple message from " + connection.getContainer(); | |
Message message = ProtonHelper.message(topic, body); | |
Properties config = KAFKA_FACADE.getConsumerProperties(); | |
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); | |
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); | |
KafkaConsumer<String, String> consumer = KafkaConsumer.create(this.vertx, config); | |
consumer.handler(record -> { | |
context.verify(() -> assertThat(record.value(), is(body))); | |
context.verify(() -> assertThat(record.partition(), is(1))); | |
LOGGER.info("Message consumed topic={} partition={} offset={}, key={}, value={}", | |
record.topic(), record.partition(), record.offset(), record.key(), record.value()); | |
consumer.close(); | |
consume.flag(); | |
}); | |
consumer.subscribe(topic, done -> { | |
if (!done.succeeded()) { | |
context.failNow(ar.cause()); | |
} | |
}); | |
Map<Symbol, Object> map = new HashMap<>(); | |
map.put(Symbol.valueOf(AmqpBridge.AMQP_PARTITION_ANNOTATION), 1); | |
MessageAnnotations messageAnnotations = new MessageAnnotations(map); | |
message.setMessageAnnotations(messageAnnotations); | |
sender.send(ProtonHelper.tag("my_tag"), message, delivery -> { | |
LOGGER.info("Message delivered {}", delivery.getRemoteState()); | |
context.verify(() -> assertThat(Accepted.getInstance(), is(delivery.getRemoteState()))); | |
context.completeNow(); | |
sender.close(); | |
connection.close(); | |
}); | |
} else { | |
context.failNow(ar.cause()); | |
} | |
}); | |
assertThat(context.awaitCompletion(60, TimeUnit.SECONDS), is(true)); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment