Skip to content

Instantly share code, notes, and snippets.

@jonaslindmark
Created October 22, 2020 07:46
Show Gist options
  • Save jonaslindmark/19cf4c3f6690a0b2e98af79f8d427db9 to your computer and use it in GitHub Desktop.
Save jonaslindmark/19cf4c3f6690a0b2e98af79f8d427db9 to your computer and use it in GitHub Desktop.
package com.nope.maybe;
import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
import com.google.cloud.pubsub.v1.TopicAdminClient;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.PushConfig;
import java.time.Clock;
import java.time.Instant;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.threeten.bp.Duration;
public class App
{
public static void main( String[] args ) throws Exception {
var receiver = new Receiver();
var project = "CHANGEME";
var topicClient = TopicAdminClient.create();
var subClient = SubscriptionAdminClient.create();
var topic = topicClient.createTopic(String.format("projects/%s/topics/dontmindmelindmarktest", project));
var sub = subClient.createSubscription(String.format("projects/%s/subscriptions/dontmindmeneither", project), topic.getName(), PushConfig.getDefaultInstance(), 10);
var subscriber = Subscriber.newBuilder(sub.getName(), receiver)
.setMaxAckExtensionPeriod(Duration.ofHours(3))
.setMaxDurationPerAckExtension(Duration.ofSeconds(5))
.build();
subscriber.startAsync().awaitRunning();
System.out.println("Started running");
var publiser = Publisher.newBuilder(topic.getName()).build();
publiser.publish(PubsubMessage.newBuilder().setData(ByteString.copyFromUtf8("hello")).build()).get();
var start = Instant.now(Clock.systemUTC()).getEpochSecond();
while (true) {
var duration = Instant.now(Clock.systemUTC()).getEpochSecond() - start;
System.out.println("Running since " + Duration.ofSeconds(duration));
Thread.sleep(15_000);
receiver.beSlow = true;
}
}
private static class Receiver implements MessageReceiver {
public volatile boolean beSlow = false;
private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
@Override
public void receiveMessage(PubsubMessage pubsubMessage, AckReplyConsumer ackReplyConsumer) {
System.out.println("I got " + pubsubMessage.getMessageId());
if (beSlow) {
System.out.println("im very very slow");
scheduler.schedule(
() -> {
System.out.println("finally time to ack!");
try {
ackReplyConsumer.ack();
} catch (Exception e) {
e.printStackTrace();
}
},
2,
TimeUnit.HOURS);
} else {
System.out.println("Im fast!");
ackReplyConsumer.nack();
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment