Created
January 23, 2019 18:59
-
-
Save rdhabalia/11f99dfb4f70c1bcc82c9f91eaffa47f to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@Test | |
public void testPartitionStuckTopic() throws Exception{ | |
log.info("-- Starting {} test --", methodName); | |
int numPartitions = 2; | |
DestinationName dn = DestinationName.get("persistent://my-property/use/my-ns/my-partition"); | |
ConsumerConfiguration conf = new ConsumerConfiguration(); | |
conf.setSubscriptionType(SubscriptionType.Failover); | |
admin.persistentTopics().createPartitionedTopic(dn.toString(), numPartitions); | |
final String sub1 = "my-partitioned-subscriber-ne1"; | |
final String sub2 = "my-partitioned-subscriber-gq1"; | |
final String sub3 = "my-partitioned-subscriber-tw1"; | |
final String sub4 = "my-partitioned-subscriber-bf1"; | |
ProducerConfiguration producerConf = new ProducerConfiguration(); | |
producerConf.setMessageRoutingMode(MessageRoutingMode.RoundRobinPartition); | |
Producer producer = pulsarClient.createProducer(dn.toString(), producerConf); | |
Consumer consumer1 = pulsarClient.subscribe(dn.toString(), sub1, conf); | |
Consumer consumer2 = pulsarClient.subscribe(dn.toString(), sub2, conf); | |
Consumer consumer3 = pulsarClient.subscribe(dn.toString(), sub3, conf); | |
Consumer consumer4 = pulsarClient.subscribe(dn.toString(), sub4, conf); | |
for (int i = 0; i < 10; i++) { | |
String message = "my-message-" + i; | |
producer.send(message.getBytes()); | |
} | |
admin.namespaces().unload("my-property/use/my-ns"); | |
Thread.sleep(5); | |
for (int i = 0; i < 1000; i++) { | |
System.out.println("************ Starting consumer times: ************"+ i); | |
consumer1.close(); | |
consumer2.close(); | |
consumer3.close(); | |
consumer4.close(); | |
Thread.sleep(5); | |
consumer1 = pulsarClient.subscribe(dn.toString(), sub1, conf); | |
consumer2 = pulsarClient.subscribe(dn.toString(), sub2, conf); | |
consumer3 = pulsarClient.subscribe(dn.toString(), sub3, conf); | |
consumer4 = pulsarClient.subscribe(dn.toString(), sub4, conf); | |
for (int j = 0; j < 10; j++) { | |
Message msg1 = null; | |
Message msg2 = null; | |
Message msg3 = null; | |
Message msg4 = null; | |
try { | |
msg1 = consumer1.receive(5, TimeUnit.SECONDS); | |
msg2 = consumer2.receive(5, TimeUnit.SECONDS); | |
msg3 = consumer3.receive(5, TimeUnit.SECONDS); | |
msg4 = consumer4.receive(5, TimeUnit.SECONDS); | |
} catch (Exception e) { | |
e.printStackTrace(); | |
} | |
if (msg1 == null || msg2 == null || msg3 == null || msg4 == null) { | |
List<CompletableFuture<Topic>> topics = pulsar.getBrokerService().getTopics().values(); | |
for (CompletableFuture<Topic> topic : topics) { | |
String sub = (msg1 == null ? sub1 : (msg2 == null ? sub2 : msg3 == null ? sub3 : sub4)); | |
PersistentDispatcherSingleActiveConsumer subscription = (PersistentDispatcherSingleActiveConsumer) topic | |
.get().getSubscription(sub).getDispatcher(); | |
System.out.println("havePendingRead = " + subscription.havePendingRead); | |
if(!subscription.havePendingRead) { | |
System.out.println("reproduce"); | |
} | |
} | |
} | |
} | |
Thread.sleep(2); | |
} | |
producer.close(); | |
log.info("-- Exiting {} test --", methodName); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment