Created
November 2, 2023 23:08
-
-
Save clebertsuconic/168228771bd724ed1af650d936ca9b3f to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@Test | |
public void testDropConsumerPendingAMQP() throws Throwable { | |
CountDownLatch latchCreating = new CountDownLatch(1); | |
CountDownLatch blockCreate = new CountDownLatch(1); | |
CountDownLatch done = new CountDownLatch(1); | |
ActiveMQServer server = createServer(true, createDefaultConfig(true)); | |
server.start(); | |
/*server.registerBrokerPlugin(new ActiveMQServerConsumerPlugin() { | |
@Override | |
public void afterCreateConsumer(ServerConsumer consumer) throws ActiveMQException { | |
logger.info("Consumer being crated on {}", consumer); | |
latchCreating.countDown(); | |
try { | |
blockCreate.await(10, TimeUnit.HOURS); | |
} catch (Exception e) { | |
logger.warn(e.getMessage(), e); | |
} finally { | |
done.countDown(); | |
} | |
} | |
}); */ | |
server.registerBrokerPlugin(new ActiveMQServerSessionPlugin() { | |
@Override | |
public void beforeCreateSession(String name, | |
String username, | |
int minLargeMessageSize, | |
RemotingConnection connection, | |
boolean autoCommitSends, | |
boolean autoCommitAcks, | |
boolean preAcknowledge, | |
boolean xa, | |
String defaultAddress, | |
SessionCallback callback, | |
boolean autoCreateQueues, | |
OperationContext context, | |
Map<SimpleString, RoutingType> prefixes) throws ActiveMQException { | |
latchCreating.countDown(); | |
try { | |
blockCreate.await(10, TimeUnit.HOURS); | |
} catch (Exception e) { | |
logger.warn(e.getMessage(), e); | |
} finally { | |
done.countDown(); | |
} | |
} | |
}); | |
Queue serverQueue = server.createQueue(new QueueConfiguration(getName()).setRoutingType(RoutingType.ANYCAST).setAddress(getName()).setAutoCreated(false)); | |
AtomicBoolean running = new AtomicBoolean(true); | |
ExecutorService executorService = Executors.newFixedThreadPool(1); | |
runAfter(executorService::shutdownNow); | |
runAfter(() -> running.set(false)); | |
ProtonTestClient peer = new ProtonTestClient(); | |
executorService.execute(() -> { | |
try { | |
peer.queueClientSaslAnonymousConnect(); | |
peer.remoteOpen().queue(); | |
peer.remoteBegin().queue(); | |
peer.remoteAttach().ofReceiver().withName(RandomUtil.randomString()).withSenderSettleModeUnsettled().withReceivervSettlesFirst().withTarget().also().withSource().withAddress(getName()).withExpiryPolicyOnLinkDetach().withDurabilityOfNone().withCapabilities("queue").withOutcomes("amqp:accepted:list", "amqp:rejected:list").also().queue(); | |
peer.connect("localhost", 61616); | |
// Waits for all the commands to fire and the drop action to be run. | |
peer.waitForScriptToComplete(); | |
} catch (Throwable e) { | |
logger.warn(e.getMessage(), e); | |
} | |
}); | |
/*executorService.execute(() -> { | |
ConnectionFactory connectionFactory = CFUtil.createConnectionFactory(protocol, "tcp://localhost:61616"); | |
try (Connection connection = connectionFactory.createConnection()) { | |
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); | |
MessageConsumer consumer = session.createConsumer(session.createQueue(getName())); | |
connection.start(); | |
while (running.get()) { | |
Message message = consumer.receive(1000); | |
} | |
} catch (Exception e) { | |
logger.warn(e.getMessage(), e); | |
} | |
}); */ | |
Assert.assertTrue(latchCreating.await(10, TimeUnit.SECONDS)); | |
server.getRemotingService().getConnections().forEach(r -> { | |
r.fail(new ActiveMQException("it's a simulation")); | |
}); | |
blockCreate.countDown(); | |
Assert.assertTrue(done.await(10, TimeUnit.SECONDS)); | |
Thread.sleep(1000); | |
Wait.assertEquals(0, serverQueue::getConsumerCount, 5000); | |
} | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment