Skip to content

Instantly share code, notes, and snippets.

@ShubhamRwt
Last active August 4, 2021 08:27
Show Gist options
  • Save ShubhamRwt/6936b7a60c1c0b458efcf2b1309cb948 to your computer and use it in GitHub Desktop.
Save ShubhamRwt/6936b7a60c1c0b458efcf2b1309cb948 to your computer and use it in GitHub Desktop.
With EKC
@Test
void sendSimpleMessagesPartition(VertxTestContext context) throws InterruptedException {
String topic = "sendSimpleMessageToPartition";
KAFKA_FACADE.createTopic(topic, 2, 1);
ProtonClient client = ProtonClient.create(vertx);
Checkpoint consume = context.checkpoint();
client.connect(AmqpBridgeIT.BRIDGE_HOST, AmqpBridgeIT.BRIDGE_PORT, ar -> {
if (ar.succeeded()) {
ProtonConnection connection = ar.result();
connection.open();
ProtonSender sender = connection.createSender(null);
sender.open();
String body = "Simple message from " + connection.getContainer();
Message message = ProtonHelper.message(topic, body);
Properties config = KAFKA_FACADE.getConsumerProperties();
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
KafkaConsumer<String, String> consumer = KafkaConsumer.create(this.vertx, config);
consumer.handler(record -> {
context.verify(() -> assertThat(record.value(), is(body)));
context.verify(() -> assertThat(record.partition(), is(1)));
LOGGER.info("Message consumed topic={} partition={} offset={}, key={}, value={}",
record.topic(), record.partition(), record.offset(), record.key(), record.value());
consumer.close();
consume.flag();
});
consumer.subscribe(topic, done -> {
if (!done.succeeded()) {
context.failNow(ar.cause());
}
});
Map<Symbol, Object> map = new HashMap<>();
map.put(Symbol.valueOf(AmqpBridge.AMQP_PARTITION_ANNOTATION), 1);
MessageAnnotations messageAnnotations = new MessageAnnotations(map);
message.setMessageAnnotations(messageAnnotations);
sender.send(ProtonHelper.tag("my_tag"), message, delivery -> {
LOGGER.info("Message delivered {}", delivery.getRemoteState());
context.verify(() -> assertThat(Accepted.getInstance(), is(delivery.getRemoteState())));
context.completeNow();
sender.close();
connection.close();
});
} else {
context.failNow(ar.cause());
}
});
assertThat(context.awaitCompletion(60, TimeUnit.SECONDS), is(true));
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment