Skip to content

Instantly share code, notes, and snippets.

@stetra
Created August 16, 2018 01:09
Show Gist options
  • Save stetra/d757fed41cb67d4a73dd7487ca4d452e to your computer and use it in GitHub Desktop.
Save stetra/d757fed41cb67d4a73dd7487ca4d452e to your computer and use it in GitHub Desktop.
package com.example;
import com.google.api.gax.batching.FlowControlSettings;
import com.google.api.gax.core.InstantiatingExecutorProvider;
import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.atomic.AtomicInteger;
public class Repro {
private static final Logger logger = LoggerFactory.getLogger(Repro.class);
private final static String project = "lofty-outcome-860";
private final static String subscription = "test-sub";
private final static int sleepMillis = 240_000;
public static class StreamingMessageReceiver implements MessageReceiver {
public AtomicInteger count;
public StreamingMessageReceiver() {
count = new AtomicInteger();
}
@Override
public void receiveMessage(PubsubMessage message, AckReplyConsumer consumer) {
try {
int c = count.incrementAndGet();
logger.debug("Received message #{} {} {}, acking after {} ms", c, message.getMessageId(), message.getData().toStringUtf8(), sleepMillis);
try {
Thread.sleep(sleepMillis);
} catch (InterruptedException e) {
logger.warn("Thread was interrupted", e);
}
logger.debug("Acking #{} {}", c, message.getMessageId());
consumer.ack();
} catch (Throwable t) {
logger.error("StreamingMessageReceiver failed with {}", t.getMessage(), t);
}
}
}
public static void doSubscribe() {
MessageReceiver receiver = new StreamingMessageReceiver();
Subscriber subscriber = Subscriber.newBuilder(ProjectSubscriptionName.of(project, subscription), receiver)
.setParallelPullCount(1)
.setExecutorProvider(InstantiatingExecutorProvider.newBuilder()
.setExecutorThreadCount(1)
.build())
.setFlowControlSettings(FlowControlSettings.newBuilder()
.setMaxOutstandingElementCount(1L)
.build())
.build();
logger.debug("Starting subscriber...");
subscriber.startAsync();
logger.debug("Started, waiting for running...");
subscriber.awaitRunning();
logger.debug("Running");
subscriber.awaitTerminated();
logger.debug("Terminated");
}
public static void main(String[] args) {
doSubscribe();
logger.debug("done");
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment