Skip to content

Instantly share code, notes, and snippets.

@anvinjain
Last active February 9, 2021 07:31
Show Gist options
  • Save anvinjain/9799d15b9c0edaf363ddfe5067301517 to your computer and use it in GitHub Desktop.
Save anvinjain/9799d15b9c0edaf363ddfe5067301517 to your computer and use it in GitHub Desktop.
Deadlock in Pulsar client, does not shutdown when DLQ messages are produced while client.close is called
package examples.pulsar.gist;
import org.apache.pulsar.client.api.*;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
public class SharedConsumeWithDLQ {
public static void main(String[] args) throws Exception {
AtomicBoolean terminator = new AtomicBoolean(false);
int workers = 4;
ExecutorService executor = Executors.newFixedThreadPool(workers + 1);
try {
CompletableFuture<Void> job = CompletableFuture.runAsync(() -> {
try {
parallelConsume(workers, terminator, executor);
} catch (Exception ex) {
throw new RuntimeException(ex);
}
}, executor);
Thread.sleep(30 * 1000);
terminator.set(true);
job.get();
} finally {
executor.shutdown();
}
}
public static void parallelConsume(int workers, AtomicBoolean terminator, ExecutorService executor) throws Exception {
List<CompletableFuture<Void>> jobs = new ArrayList<>();
AtomicLong successCtr = new AtomicLong(0), failureCtr = new AtomicLong(0);
try (PulsarClient client = createClient();
Consumer<byte[]> consumer = createConsumer(client)) {
try {
for (int i = 0; i < workers; i++) {
jobs.add(CompletableFuture.runAsync(() -> {
try {
consumeJob(consumer, terminator, successCtr, failureCtr);
} catch (PulsarClientException ex) {
throw new RuntimeException(ex);
}
}, executor));
}
awaitCompletion(jobs);
} finally {
System.out.printf("TOTAL successfully consumed=%d, failures=%d%n", successCtr.get(), failureCtr.get());
}
}
}
private static <T> void consumeJob(Consumer<T> consumer, AtomicBoolean terminator, AtomicLong successCtr, AtomicLong failureCtr) throws PulsarClientException {
long success = 0, failures = 0;
try {
while (!terminator.get()) {
Message<T> message = consumer.receive(1, TimeUnit.SECONDS);
if (message != null) {
try {
processMessage(message);
consumer.acknowledgeAsync(message);
success++;
} catch (Exception ex) {
failures++;
consumer.negativeAcknowledge(message);
}
}
}
} finally {
System.out.printf("[%s] Consumed %d messages successfully and %d failures observed%n", Thread.currentThread().getName(), success, failures);
if (successCtr != null ) {
successCtr.addAndGet(success);
}
if (failureCtr != null) {
failureCtr.addAndGet(failures);
}
}
}
private static PulsarClient createClient() throws PulsarClientException {
String localEndpoint = "pulsar://localhost:6650";
return PulsarClient.builder()
.serviceUrl(localEndpoint)
.statsInterval(5, TimeUnit.SECONDS)
.build();
}
private static Consumer<byte[]> createConsumer(PulsarClient client) throws PulsarClientException {
return client.newConsumer()
.topic("persistent://public/default/pt1")
.subscriptionName("sub-d")
.subscriptionType(SubscriptionType.Shared)
.ackTimeout(5, TimeUnit.SECONDS)
.deadLetterPolicy(DeadLetterPolicy.builder()
.deadLetterTopic("persistent://public/default/pt1-sub-d-dlq")
.maxRedeliverCount(1)
.build())
.negativeAckRedeliveryDelay(30, TimeUnit.SECONDS)
.subscribe();
}
private static <T> String processMessage(Message<T> message) throws RuntimeException {
float messageProcessFailPercentage = 5;
if(Math.random() * 100 < messageProcessFailPercentage) {
// Fail message processing randomly to ensure messages are produced to DLQ
String error = "Error processing message=" + message.getMessageId();
System.err.println(error);
throw new RuntimeException(error);
}
String payload = new String(message.getData());
System.out.println("Processed|" + message.getMessageId() + "|" + payload);
return payload;
}
private static void awaitCompletion(List<CompletableFuture<Void>> jobs) throws InterruptedException, ExecutionException {
AtomicBoolean errored = new AtomicBoolean(false);
StringBuffer errorBuilder = new StringBuffer("Errors:\n");
for(CompletableFuture<Void> job: jobs) {
job.handle((v, e) -> {
if(e != null) {
errored.set(true);
errorBuilder.append(e.getMessage() + "\n");
}
return v;
}).get();
}
if(errored.get()) {
throw new CompletionException(new RuntimeException(errorBuilder.toString()));
}
}
}
"pulsar-client-io-1-1@2458" prio=5 tid=0x10 nid=NA waiting
java.lang.Thread.State: WAITING
at sun.misc.Unsafe.park(Unsafe.java:-1)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:870)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1199)
at java.util.concurrent.locks.ReentrantReadWriteLock$WriteLock.lock(ReentrantReadWriteLock.java:943)
at org.apache.pulsar.client.impl.UnAckedMessageTracker.stop(UnAckedMessageTracker.java:248)
at org.apache.pulsar.client.impl.UnAckedMessageTracker.close(UnAckedMessageTracker.java:261)
at org.apache.pulsar.client.impl.MultiTopicsConsumerImpl.lambda$closeAsync$19(MultiTopicsConsumerImpl.java:577)
at org.apache.pulsar.client.impl.MultiTopicsConsumerImpl$$Lambda$149.1057807426.accept(Unknown Source:-1)
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
at org.apache.pulsar.client.impl.ConsumerImpl.cleanupAtClose(ConsumerImpl.java:1057)
at org.apache.pulsar.client.impl.ConsumerImpl.lambda$closeAsync$8(ConsumerImpl.java:1042)
at org.apache.pulsar.client.impl.ConsumerImpl$$Lambda$148.757377063.apply(Unknown Source:-1)
at java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:836)
at java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:811)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
at org.apache.pulsar.client.impl.ClientCnx.handleSuccess(ClientCnx.java:432)
at org.apache.pulsar.common.protocol.PulsarDecoder.channelRead(PulsarDecoder.java:257)
at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at org.apache.pulsar.shade.io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324)
at org.apache.pulsar.shade.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296)
at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at org.apache.pulsar.shade.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at org.apache.pulsar.shade.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
at org.apache.pulsar.shade.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
at org.apache.pulsar.shade.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714)
at org.apache.pulsar.shade.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
at org.apache.pulsar.shade.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
at org.apache.pulsar.shade.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
at org.apache.pulsar.shade.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
at org.apache.pulsar.shade.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at org.apache.pulsar.shade.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(Thread.java:748)
"pulsar-timer-5-1@3242" prio=5 tid=0x11 nid=NA waiting
java.lang.Thread.State: WAITING
at sun.misc.Unsafe.park(Unsafe.java:-1)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707)
at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
at java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
at org.apache.pulsar.client.impl.TypedMessageBuilderImpl.send(TypedMessageBuilderImpl.java:93)
at org.apache.pulsar.client.impl.ConsumerImpl.processPossibleToDLQ(ConsumerImpl.java:1857)
at org.apache.pulsar.client.impl.ConsumerImpl.lambda$null$18(ConsumerImpl.java:1794)
at org.apache.pulsar.client.impl.ConsumerImpl$$Lambda$109.319305176.test(Unknown Source:-1)
at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:174)
at java.util.Iterator.forEachRemaining(Iterator.java:116)
at java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566)
at org.apache.pulsar.client.impl.ConsumerImpl.lambda$redeliverUnacknowledgedMessages$20(ConsumerImpl.java:1800)
at org.apache.pulsar.client.impl.ConsumerImpl$$Lambda$108.433786663.accept(Unknown Source:-1)
at java.lang.Iterable.forEach(Iterable.java:75)
at org.apache.pulsar.client.impl.ConsumerImpl.redeliverUnacknowledgedMessages(ConsumerImpl.java:1792)
at org.apache.pulsar.client.impl.MultiTopicsConsumerImpl.lambda$redeliverUnacknowledgedMessages$24(MultiTopicsConsumerImpl.java:654)
at org.apache.pulsar.client.impl.MultiTopicsConsumerImpl$$Lambda$105.1296593353.accept(Unknown Source:-1)
at java.util.HashMap.forEach(HashMap.java:1289)
at org.apache.pulsar.client.impl.MultiTopicsConsumerImpl.redeliverUnacknowledgedMessages(MultiTopicsConsumerImpl.java:652)
at org.apache.pulsar.client.impl.UnAckedMessageTracker$2.run(UnAckedMessageTracker.java:145)
at org.apache.pulsar.shade.io.netty.util.HashedWheelTimer$HashedWheelTimeout.expire(HashedWheelTimer.java:672)
at org.apache.pulsar.shade.io.netty.util.HashedWheelTimer$HashedWheelBucket.expireTimeouts(HashedWheelTimer.java:747)
at org.apache.pulsar.shade.io.netty.util.HashedWheelTimer$Worker.run(HashedWheelTimer.java:472)
at org.apache.pulsar.shade.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(Thread.java:748)
"pool-1-thread-1@729" prio=5 tid=0xd nid=NA waiting
java.lang.Thread.State: WAITING
at sun.misc.Unsafe.park(Unsafe.java:-1)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707)
at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
at java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
at org.apache.pulsar.client.impl.ConsumerBase.close(ConsumerBase.java:542)
at examples.pulsar.gist.SharedConsumeWithDLQ.parallelConsume(SharedConsumeWithDLQ.java:53)
at examples.pulsar.gist.SharedConsumeWithDLQ.lambda$main$0(SharedConsumeWithDLQ.java:20)
at examples.pulsar.gist.SharedConsumeWithDLQ$$Lambda$1.885951223.run(Unknown Source:-1)
at java.util.concurrent.CompletableFuture$AsyncRun.run$$$capture(CompletableFuture.java:1640)
at java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:-1)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
"pulsar-client-internal-4-1@3718" prio=5 tid=0x17 nid=NA waiting
java.lang.Thread.State: WAITING
at sun.misc.Unsafe.park(Unsafe.java:-1)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1081)
at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809)
at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at org.apache.pulsar.shade.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(Thread.java:748)
"pulsar-external-listener-3-1@3713" prio=5 tid=0x16 nid=NA waiting
java.lang.Thread.State: WAITING
at sun.misc.Unsafe.park(Unsafe.java:-1)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1081)
at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809)
at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at org.apache.pulsar.shade.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(Thread.java:748)
"pool-1-thread-5@3634" prio=5 tid=0x15 nid=NA waiting
java.lang.Thread.State: WAITING
at sun.misc.Unsafe.park(Unsafe.java:-1)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
"pool-1-thread-4@3633" prio=5 tid=0x14 nid=NA waiting
java.lang.Thread.State: WAITING
at sun.misc.Unsafe.park(Unsafe.java:-1)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
"pool-1-thread-3@3632" prio=5 tid=0x13 nid=NA waiting
java.lang.Thread.State: WAITING
at sun.misc.Unsafe.park(Unsafe.java:-1)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
"pool-1-thread-2@3631" prio=5 tid=0x12 nid=NA waiting
java.lang.Thread.State: WAITING
at sun.misc.Unsafe.park(Unsafe.java:-1)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
"main@1" prio=5 tid=0x1 nid=NA waiting
java.lang.Thread.State: WAITING
at sun.misc.Unsafe.park(Unsafe.java:-1)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707)
at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
at java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
at examples.pulsar.gist.SharedConsumeWithDLQ.main(SharedConsumeWithDLQ.java:28)
"Finalizer@3934" daemon prio=8 tid=0x3 nid=NA waiting
java.lang.Thread.State: WAITING
at java.lang.Object.wait(Object.java:-1)
at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:144)
at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:165)
at java.lang.ref.Finalizer$FinalizerThread.run(Finalizer.java:216)
"Reference Handler@3935" daemon prio=10 tid=0x2 nid=NA waiting
java.lang.Thread.State: WAITING
at java.lang.Object.wait(Object.java:-1)
at java.lang.Object.wait(Object.java:502)
at java.lang.ref.Reference.tryHandlePending(Reference.java:191)
at java.lang.ref.Reference$ReferenceHandler.run(Reference.java:153)
"Signal Dispatcher@3933" daemon prio=9 tid=0x4 nid=NA runnable
java.lang.Thread.State: RUNNABLE
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment