Skip to content

Instantly share code, notes, and snippets.

@clebertsuconic
Created November 2, 2023 23:08
Show Gist options
  • Save clebertsuconic/168228771bd724ed1af650d936ca9b3f to your computer and use it in GitHub Desktop.
Save clebertsuconic/168228771bd724ed1af650d936ca9b3f to your computer and use it in GitHub Desktop.
@Test
public void testDropConsumerPendingAMQP() throws Throwable {
CountDownLatch latchCreating = new CountDownLatch(1);
CountDownLatch blockCreate = new CountDownLatch(1);
CountDownLatch done = new CountDownLatch(1);
ActiveMQServer server = createServer(true, createDefaultConfig(true));
server.start();
/*server.registerBrokerPlugin(new ActiveMQServerConsumerPlugin() {
@Override
public void afterCreateConsumer(ServerConsumer consumer) throws ActiveMQException {
logger.info("Consumer being crated on {}", consumer);
latchCreating.countDown();
try {
blockCreate.await(10, TimeUnit.HOURS);
} catch (Exception e) {
logger.warn(e.getMessage(), e);
} finally {
done.countDown();
}
}
}); */
server.registerBrokerPlugin(new ActiveMQServerSessionPlugin() {
@Override
public void beforeCreateSession(String name,
String username,
int minLargeMessageSize,
RemotingConnection connection,
boolean autoCommitSends,
boolean autoCommitAcks,
boolean preAcknowledge,
boolean xa,
String defaultAddress,
SessionCallback callback,
boolean autoCreateQueues,
OperationContext context,
Map<SimpleString, RoutingType> prefixes) throws ActiveMQException {
latchCreating.countDown();
try {
blockCreate.await(10, TimeUnit.HOURS);
} catch (Exception e) {
logger.warn(e.getMessage(), e);
} finally {
done.countDown();
}
}
});
Queue serverQueue = server.createQueue(new QueueConfiguration(getName()).setRoutingType(RoutingType.ANYCAST).setAddress(getName()).setAutoCreated(false));
AtomicBoolean running = new AtomicBoolean(true);
ExecutorService executorService = Executors.newFixedThreadPool(1);
runAfter(executorService::shutdownNow);
runAfter(() -> running.set(false));
ProtonTestClient peer = new ProtonTestClient();
executorService.execute(() -> {
try {
peer.queueClientSaslAnonymousConnect();
peer.remoteOpen().queue();
peer.remoteBegin().queue();
peer.remoteAttach().ofReceiver().withName(RandomUtil.randomString()).withSenderSettleModeUnsettled().withReceivervSettlesFirst().withTarget().also().withSource().withAddress(getName()).withExpiryPolicyOnLinkDetach().withDurabilityOfNone().withCapabilities("queue").withOutcomes("amqp:accepted:list", "amqp:rejected:list").also().queue();
peer.connect("localhost", 61616);
// Waits for all the commands to fire and the drop action to be run.
peer.waitForScriptToComplete();
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
}
});
/*executorService.execute(() -> {
ConnectionFactory connectionFactory = CFUtil.createConnectionFactory(protocol, "tcp://localhost:61616");
try (Connection connection = connectionFactory.createConnection()) {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = session.createConsumer(session.createQueue(getName()));
connection.start();
while (running.get()) {
Message message = consumer.receive(1000);
}
} catch (Exception e) {
logger.warn(e.getMessage(), e);
}
}); */
Assert.assertTrue(latchCreating.await(10, TimeUnit.SECONDS));
server.getRemotingService().getConnections().forEach(r -> {
r.fail(new ActiveMQException("it's a simulation"));
});
blockCreate.countDown();
Assert.assertTrue(done.await(10, TimeUnit.SECONDS));
Thread.sleep(1000);
Wait.assertEquals(0, serverQueue::getConsumerCount, 5000);
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment