Skip to content

Instantly share code, notes, and snippets.

@labianchin
Last active July 22, 2022 09:33
Show Gist options
  • Save labianchin/368194bae8c3fe09f9a2a3a0e75291c6 to your computer and use it in GitHub Desktop.
Save labianchin/368194bae8c3fe09f9a2a3a0e75291c6 to your computer and use it in GitHub Desktop.

Context

On June 28th we made some library upgrades to our service. The upgrade contained a bump of google-cloud-pubsub from 1.115.5 to 1.119.0. The change caused some of the workloads to crash with OutOfMemoryError. Not all workloads were affected, the ones affected the most consume from topics with average message size ranging from 10 KiB to 100 KiB.

Investigation: When taking a heap histogram (with jmap -histio) we noticed significant more com.google.pubsub.v1.PubsubMessage and com.google.protobuf.ByteString$LiteralByteString object when using newer google-cloud-pubsub library version.

Hypothesis: a) Could these long changes have introduced some issue googleapis/java-pubsub#1022? b) Perhaps a memory leak was introduced?

Sample code

To reproduce the problem we build a simpler sample code that emulates our application. We tested it with different versions of the library and using a subscription with real data of average message size of about 70 KiB. We run PubSubConsumerExample1.java with -XX:ActiveProcessorCount=2 -Xmx2g.

Then to get the heap histogram, we used:

jmap -histo $(jps -l -m | grep PubSubConsumerExample1 | head -1 | cut -d' ' -f1) | grep -e 'pubsub\|ByteString\|B \|I \|nio'  | head -n 12

We also used JMC to take a Heap dump.

Some results

# 1.115.5
num     #instances         #bytes  class name (module)
  8:         12243         587664  java.nio.HeapByteBuffer (java.base@11.0.15)
 10:         10093         403720  com.google.cloud.pubsub.v1.MessageDispatcher$AckHandler
 14:         12218         293232  com.google.cloud.pubsub.it.PubSubConsumerExample1$ContainerObj1
 19:          9751         234024  com.google.cloud.pubsub.v1.MessageDispatcher$3
 22:          3002         192128  java.nio.DirectByteBuffer (java.base@11.0.15)
 25:          9750         156000  com.google.cloud.pubsub.it.PubSubConsumerExample1$$Lambda$155/0x0000000800420840

# 1.119.0
# Note the significant more PubSubMessage and ByteString objects
num     #instances         #bytes  class name (module)
   1:         54045      493944336  [B (java.base@11.0.15)
   2:          4469       26687888  [I (java.base@11.0.15)
   8:          6201         396864  java.nio.DirectByteBuffer (java.base@11.0.15)
  14:          3386         162528  java.nio.HeapByteBuffer (java.base@11.0.15)
  15:          3362         161376  com.google.pubsub.v1.PubsubMessage
  19:          3361         134440  com.google.cloud.pubsub.v1.MessageDispatcher$AckHandler
  23:          3361         107552  com.google.cloud.pubsub.v1.MessageDispatcher$3
  26:          3851          92424  com.google.protobuf.ByteString$LiteralByteString
  34:          3361          80664  com.google.cloud.pubsub.it.PubSubConsumerExample1$ContainerObj1
  35:          3361          80664  com.google.cloud.pubsub.v1.AckRequestData
  40:          2629          61976  [Ljava.nio.ByteBuffer; (java.base@11.0.15)
  46:          3361          53776  com.google.cloud.pubsub.it.PubSubConsumerExample1$$Lambda$175/0x000000080043b840


# custom patched 1.119.0 (avoids anonymous AckReplyConsumer instances)
# Note MessageDispatcher$3 was replaced by AckReplyConsumerImpl with less overhead (was 32 bytes, now 16 bytes)
# Also no leak of PubsubMessage and ByteString$LiteralByteString
num     #instances         #bytes  class name (module)
   1:         56048      382810944  [B (java.base@11.0.15)
   2:          4016        7620824  [I (java.base@11.0.15)
   6:         15395         985280  java.nio.DirectByteBuffer (java.base@11.0.15)
  15:          4456         213888  java.nio.HeapByteBuffer (java.base@11.0.15)
  18:          4407         176280  com.google.cloud.pubsub.v1.MessageDispatcher$AckHandler
  22:          6014         131696  [Ljava.nio.ByteBuffer; (java.base@11.0.15)
  30:          4407         105768  com.google.cloud.pubsub.it.PubSubConsumerExample1$ContainerObj1
  31:          4407         105768  com.google.cloud.pubsub.v1.AckRequestData
  45:          4407          70512  com.google.cloud.pubsub.it.PubSubConsumerExample1$$Lambda$180/0x00000008004b8440
  46:          4407          70512  com.google.cloud.pubsub.v1.AckReplyConsumerImpl

package com.google.cloud.pubsub.it;
import com.google.api.gax.batching.FlowControlSettings;
import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Level;
import java.util.logging.Logger;
/**
* This example code uses google-cloud-pubsub to asynchronous pull messages.
* It batches messages and every few seconds releases them (here via nack)
* https://cloud.google.com/pubsub/docs/samples/pubsub-subscriber-flow-settings
*/
public class PubSubConsumerExample1 {
private static final Logger LOG = Logger.getLogger(PubSubConsumerExample1.class.getName());
private static final LoadingCache<Integer, List<ContainerObj1>> EXPIRING_BATCH =
CacheBuilder.newBuilder()
.concurrencyLevel(4)
.maximumSize(1000)
.expireAfterWrite(10, TimeUnit.SECONDS)
.removalListener(v -> completeBatch((List<ContainerObj1>) v.getValue()))
.build(
new CacheLoader<Integer, List<ContainerObj1>>() {
@Override
public List<ContainerObj1> load(final Integer key) {
return new ArrayList<>(1000);
}
});
private static void completeBatch(List<ContainerObj1> batch) {
if (batch.hashCode() % 10 == 0) {
LOG.info(String.format("Completing batch with %s items", batch.size()));
}
// Note: here it would "flush" the batch to GCS and only then proceed to ack all the messages
batch.forEach(ContainerObj1::complete);
}
private static final AtomicLong TOTAL_SIZE = new AtomicLong(0L);
private static final AtomicLong TOTAL_COUNT = new AtomicLong(0L);
public static void main(String... args) throws Exception {
// Use: -XX:ActiveProcessorCount=2 -Xmx2g
LOG.info(String.format("availableProcessors: %s ; maxMemory: %s", Runtime.getRuntime().availableProcessors(), Runtime.getRuntime().maxMemory()));
// gcloud pubsub --project $PROJECT subscriptions create test1-setmodification
// --topic $TOPIC --expiration-period=1d
// --message-retention-duration=1h
Thread.sleep(5000); // sleep a bit so we can connect jmc
subscribeAsyncExample(args[0], args[1]);
}
public static void subscribeAsyncExample(String projectId, String subscriptionId) {
final ProjectSubscriptionName subscriptionName =
ProjectSubscriptionName.of(projectId, subscriptionId);
Subscriber subscriber = null;
try {
subscriber = buildSubscriber(subscriptionName, PubSubConsumerExample1::receiver);
// Start the subscriber.
subscriber.startAsync().awaitRunning();
System.out.printf("Listening for messages on %s:\n", subscriptionName.toString());
// Allow the subscriber to run for 30s unless an unrecoverable error occurs.
subscriber.awaitTerminated(30, TimeUnit.SECONDS);
} catch (TimeoutException timeoutException) {
// Shut down the subscriber after 30s. Stop receiving messages.
subscriber.stopAsync();
}
}
private static Subscriber buildSubscriber(
ProjectSubscriptionName subscriptionName, MessageReceiver messageReceiver) {
final FlowControlSettings flowControlSettings =
FlowControlSettings.newBuilder()
.setMaxOutstandingElementCount(300000L) // 300k
.setMaxOutstandingRequestBytes(3221225472L) // 3G
.build();
return Subscriber.newBuilder(subscriptionName, messageReceiver)
.setFlowControlSettings(flowControlSettings)
.setParallelPullCount(4 * Math.max(1, Runtime.getRuntime().availableProcessors()))
.build();
}
private static void receiver(PubsubMessage pubsubMessage, AckReplyConsumer ackReplyConsumer) {
handle(pubsubMessage.getMessageId(), pubsubMessage.getData())
.whenComplete(
(msgId, ex) -> {
if (ex != null) {
LOG.log(Level.WARNING, "Will not ack message due to: ", ex);
}
// nack messages always
ackReplyConsumer.nack();
});
}
private static synchronized CompletionStage<String> handle(String messageId, ByteString data) {
TOTAL_SIZE.addAndGet(data.size());
TOTAL_COUNT.incrementAndGet();
if (messageId.hashCode() % 100 == 0) {
LOG.info(
String.format(
"Got messageId %s with size %d ; totalSize %d totalCount %d avgSize %d",
messageId,
data.size(),
TOTAL_SIZE.get(),
TOTAL_COUNT.get(),
TOTAL_SIZE.get()/TOTAL_COUNT.get()
));
}
final ContainerObj1 containerObj = new ContainerObj1(data, messageId);
try {
// keep about 3 ongoing batches, assign to one of them randomly
EXPIRING_BATCH.get(ThreadLocalRandom.current().nextInt(3)).add(containerObj);
} catch (ExecutionException e) {
e.printStackTrace();
}
return containerObj.future;
}
private static class ContainerObj1 {
final ByteBuffer data;
final CompletableFuture<String> future;
final String ackId;
private ContainerObj1(final ByteString data, final String ackId) {
// the point here is that we do not keep references to ByteString around, but something else
this.data = ByteBuffer.wrap(data.toByteArray());
this.ackId = ackId;
this.future = new CompletableFuture<>();
}
public void complete() {
this.future.complete(this.ackId);
}
}
}
From 9911c9aad73481e0d4a18b64f1f9d12e8a3d997b Mon Sep 17 00:00:00 2001
From: Luis Bianchin <labianchin@users.noreply.github.com>
Date: Fri, 22 Jul 2022 11:01:26 +0200
Subject: [PATCH] Avoid PubSubMessage leak on
MessageDispatcher#processOutstandingMessage
---
.../cloud/pubsub/v1/AckReplyConsumerImpl.java | 21 ++++++++++++++
.../v1/AckReplyConsumerWithResponseImpl.java | 27 ++++++++++++++++++
.../cloud/pubsub/v1/MessageDispatcher.java | 28 ++-----------------
3 files changed, 50 insertions(+), 26 deletions(-)
create mode 100644 google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/AckReplyConsumerImpl.java
create mode 100644 google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/AckReplyConsumerWithResponseImpl.java
diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/AckReplyConsumerImpl.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/AckReplyConsumerImpl.java
new file mode 100644
index 0000000..7510c14
--- /dev/null
+++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/AckReplyConsumerImpl.java
@@ -0,0 +1,21 @@
+package com.google.cloud.pubsub.v1;
+
+import com.google.api.core.SettableApiFuture;
+
+public class AckReplyConsumerImpl implements AckReplyConsumer {
+ final SettableApiFuture<MessageDispatcher.AckReply> ackReplySettableApiFuture;
+
+ public AckReplyConsumerImpl(final SettableApiFuture<MessageDispatcher.AckReply> ackReplySettableApiFuture) {
+ this.ackReplySettableApiFuture = ackReplySettableApiFuture;
+ }
+
+ @Override
+ public void ack() {
+ ackReplySettableApiFuture.set(MessageDispatcher.AckReply.ACK);
+ }
+
+ @Override
+ public void nack() {
+ ackReplySettableApiFuture.set(MessageDispatcher.AckReply.NACK);
+ }
+}
diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/AckReplyConsumerWithResponseImpl.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/AckReplyConsumerWithResponseImpl.java
new file mode 100644
index 0000000..411e15c
--- /dev/null
+++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/AckReplyConsumerWithResponseImpl.java
@@ -0,0 +1,27 @@
+package com.google.cloud.pubsub.v1;
+
+import com.google.api.core.SettableApiFuture;
+
+import java.util.concurrent.Future;
+
+public class AckReplyConsumerWithResponseImpl implements AckReplyConsumerWithResponse {
+ final SettableApiFuture<MessageDispatcher.AckReply> ackReplySettableApiFuture;
+ final SettableApiFuture<AckResponse> messageFuture;
+
+ public AckReplyConsumerWithResponseImpl(SettableApiFuture<MessageDispatcher.AckReply> ackReplySettableApiFuture, SettableApiFuture<AckResponse> messageFuture) {
+ this.ackReplySettableApiFuture = ackReplySettableApiFuture;
+ this.messageFuture = messageFuture;
+ }
+
+ @Override
+ public Future<AckResponse> ack() {
+ ackReplySettableApiFuture.set(MessageDispatcher.AckReply.ACK);
+ return messageFuture;
+ }
+
+ @Override
+ public Future<AckResponse> nack() {
+ ackReplySettableApiFuture.set(MessageDispatcher.AckReply.NACK);
+ return messageFuture;
+ }
+}
diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java
index de51d3b..a5bb47b 100644
--- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java
+++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java
@@ -433,34 +433,10 @@ class MessageDispatcher {
// This is the message future that is propagated to the user
SettableApiFuture<AckResponse> messageFuture =
ackHandler.getMessageFutureIfExists();
- final AckReplyConsumerWithResponse ackReplyConsumerWithResponse =
- new AckReplyConsumerWithResponse() {
- @Override
- public Future<AckResponse> ack() {
- ackReplySettableApiFuture.set(AckReply.ACK);
- return messageFuture;
- }
-
- @Override
- public Future<AckResponse> nack() {
- ackReplySettableApiFuture.set(AckReply.NACK);
- return messageFuture;
- }
- };
+ final AckReplyConsumerWithResponse ackReplyConsumerWithResponse = new AckReplyConsumerWithResponseImpl(ackReplySettableApiFuture, messageFuture);
receiverWithAckResponse.receiveMessage(message, ackReplyConsumerWithResponse);
} else {
- final AckReplyConsumer ackReplyConsumer =
- new AckReplyConsumer() {
- @Override
- public void ack() {
- ackReplySettableApiFuture.set(AckReply.ACK);
- }
-
- @Override
- public void nack() {
- ackReplySettableApiFuture.set(AckReply.NACK);
- }
- };
+ final AckReplyConsumer ackReplyConsumer = new AckReplyConsumerImpl(ackReplySettableApiFuture);
receiver.receiveMessage(message, ackReplyConsumer);
}
} catch (Exception e) {
--
2.37.0
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment