Skip to content

Instantly share code, notes, and snippets.

@rdhabalia
Created January 23, 2019 18:59
Show Gist options
  • Save rdhabalia/11f99dfb4f70c1bcc82c9f91eaffa47f to your computer and use it in GitHub Desktop.
Save rdhabalia/11f99dfb4f70c1bcc82c9f91eaffa47f to your computer and use it in GitHub Desktop.
@Test
public void testPartitionStuckTopic() throws Exception{
log.info("-- Starting {} test --", methodName);
int numPartitions = 2;
DestinationName dn = DestinationName.get("persistent://my-property/use/my-ns/my-partition");
ConsumerConfiguration conf = new ConsumerConfiguration();
conf.setSubscriptionType(SubscriptionType.Failover);
admin.persistentTopics().createPartitionedTopic(dn.toString(), numPartitions);
final String sub1 = "my-partitioned-subscriber-ne1";
final String sub2 = "my-partitioned-subscriber-gq1";
final String sub3 = "my-partitioned-subscriber-tw1";
final String sub4 = "my-partitioned-subscriber-bf1";
ProducerConfiguration producerConf = new ProducerConfiguration();
producerConf.setMessageRoutingMode(MessageRoutingMode.RoundRobinPartition);
Producer producer = pulsarClient.createProducer(dn.toString(), producerConf);
Consumer consumer1 = pulsarClient.subscribe(dn.toString(), sub1, conf);
Consumer consumer2 = pulsarClient.subscribe(dn.toString(), sub2, conf);
Consumer consumer3 = pulsarClient.subscribe(dn.toString(), sub3, conf);
Consumer consumer4 = pulsarClient.subscribe(dn.toString(), sub4, conf);
for (int i = 0; i < 10; i++) {
String message = "my-message-" + i;
producer.send(message.getBytes());
}
admin.namespaces().unload("my-property/use/my-ns");
Thread.sleep(5);
for (int i = 0; i < 1000; i++) {
System.out.println("************ Starting consumer times: ************"+ i);
consumer1.close();
consumer2.close();
consumer3.close();
consumer4.close();
Thread.sleep(5);
consumer1 = pulsarClient.subscribe(dn.toString(), sub1, conf);
consumer2 = pulsarClient.subscribe(dn.toString(), sub2, conf);
consumer3 = pulsarClient.subscribe(dn.toString(), sub3, conf);
consumer4 = pulsarClient.subscribe(dn.toString(), sub4, conf);
for (int j = 0; j < 10; j++) {
Message msg1 = null;
Message msg2 = null;
Message msg3 = null;
Message msg4 = null;
try {
msg1 = consumer1.receive(5, TimeUnit.SECONDS);
msg2 = consumer2.receive(5, TimeUnit.SECONDS);
msg3 = consumer3.receive(5, TimeUnit.SECONDS);
msg4 = consumer4.receive(5, TimeUnit.SECONDS);
} catch (Exception e) {
e.printStackTrace();
}
if (msg1 == null || msg2 == null || msg3 == null || msg4 == null) {
List<CompletableFuture<Topic>> topics = pulsar.getBrokerService().getTopics().values();
for (CompletableFuture<Topic> topic : topics) {
String sub = (msg1 == null ? sub1 : (msg2 == null ? sub2 : msg3 == null ? sub3 : sub4));
PersistentDispatcherSingleActiveConsumer subscription = (PersistentDispatcherSingleActiveConsumer) topic
.get().getSubscription(sub).getDispatcher();
System.out.println("havePendingRead = " + subscription.havePendingRead);
if(!subscription.havePendingRead) {
System.out.println("reproduce");
}
}
}
}
Thread.sleep(2);
}
producer.close();
log.info("-- Exiting {} test --", methodName);
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment