Skip to content

Instantly share code, notes, and snippets.

@clebertsuconic
Created October 17, 2023 18:31
Show Gist options
  • Save clebertsuconic/0855f59ea888d46e4c1de2ee0ba74e62 to your computer and use it in GitHub Desktop.
Save clebertsuconic/0855f59ea888d46e4c1de2ee0ba74e62 to your computer and use it in GitHub Desktop.
@Test
public void testConsumerDroppedWithRegularClient() throws Exception {
int NUMBER_OF_CONNECTIONS = 100;
ActiveMQServer server = createServer(true, createDefaultConfig(true));
server.start();
Queue serverQueue = server.createQueue(new QueueConfiguration("test-queue").setRoutingType(RoutingType.ANYCAST)
.setAddress("test-queue")
.setAutoCreated(false));
ExecutorService executorService = Executors.newFixedThreadPool(NUMBER_OF_CONNECTIONS);
runAfter(executorService::shutdownNow);
CountDownLatch done = new CountDownLatch(NUMBER_OF_CONNECTIONS);
ConnectionFactory factory = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:61616");
final AtomicBoolean running = new AtomicBoolean(true);
runAfter(() -> running.set(false));
CyclicBarrier flagStart = new CyclicBarrier(NUMBER_OF_CONNECTIONS + 1);
for (int i = 0; i < NUMBER_OF_CONNECTIONS; i++) {
executorService.execute(() -> {
try {
boolean alreadyStarted = false;
while (running.get()) {
try (Connection connection = factory.createConnection()) {
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
javax.jms.Queue jmsQueue = session.createQueue("test-queue");
while (running.get()) {
if (!alreadyStarted) {
flagStart.await(10, TimeUnit.SECONDS);
alreadyStarted = true;
}
MessageConsumer consumer = session.createConsumer(jmsQueue);
}
} catch (Exception e) {
logger.warn(e.getMessage(), e);
}
}
} finally {
done.countDown();
}
});
}
flagStart.await(10, TimeUnit.SECONDS);
for (int i = 0; i < 1000; i++) {
server.getRemotingService().getConnections().forEach(r -> r.fail(new ActiveMQException("it's a client failure")));
Thread.sleep(10);
logger.info("loop kill {}", i);
}
running.set(false);
Assert.assertTrue(done.await(10, TimeUnit.SECONDS));
Wait.assertEquals(0, () -> serverQueue.getConsumers().size(), 5000, 100);
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment