Skip to content

Instantly share code, notes, and snippets.

@clebertsuconic
Created October 17, 2023 18:01
Show Gist options
  • Save clebertsuconic/88e25936610a178d86da90e8b2487a5d to your computer and use it in GitHub Desktop.
Save clebertsuconic/88e25936610a178d86da90e8b2487a5d to your computer and use it in GitHub Desktop.
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.amqp;
import java.lang.invoke.MethodHandles;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.tests.util.Wait;
import org.apache.qpid.protonj2.test.driver.ProtonTestClient;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class AMQPConsumerDroppedTest extends ActiveMQTestBase {
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
int NUMBER_OF_CONNECTIONS = 100;
@Test (timeout = 20_000)
public void testConsumerDropped() throws Exception {
ActiveMQServer server = createServer(false, createDefaultConfig(true));
server.start();
Queue serverQueue = server.createQueue(new QueueConfiguration("test-queue").setRoutingType(RoutingType.ANYCAST)
.setAddress("test-queue")
.setAutoCreated(false));
ExecutorService executorService = Executors.newFixedThreadPool(NUMBER_OF_CONNECTIONS);
runAfter(executorService::shutdownNow);
CountDownLatch done = new CountDownLatch(NUMBER_OF_CONNECTIONS);
AtomicInteger errors = new AtomicInteger(0);
for (int i = 0; i < NUMBER_OF_CONNECTIONS; i++) {
executorService.execute(() -> {
try (ProtonTestClient peer = new ProtonTestClient()) {
peer.queueClientSaslAnonymousConnect();
peer.remoteOpen().queue();
peer.expectOpen();
peer.remoteBegin().queue();
peer.expectBegin();
peer.remoteAttach().ofReceiver()
.withName("test-client-drop")
.withSenderSettleModeUnsettled()
.withReceivervSettlesFirst()
.withTarget().also()
.withSource().withAddress("test-queue")
.withExpiryPolicyOnLinkDetach()
.withDurabilityOfNone()
.withCapabilities("queue")
.withOutcomes("amqp:accepted:list", "amqp:rejected:list")
.also()
.queue();
peer.dropAfterLastHandler(); // This closes the netty connection after the attach is written
peer.connect("localhost", 61616);
// Waits for all the commands to fire and the drop action to be run.
peer.waitForScriptToComplete();
} catch (Throwable e) {
errors.incrementAndGet();
logger.warn(e.getMessage(), e);
} finally {
done.countDown();
}
});
}
Assert.assertTrue(done.await(10, TimeUnit.SECONDS));
Assert.assertEquals(0, errors.get());
Wait.assertEquals(0, () -> serverQueue.getConsumers().size(), 5000, 100);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment