Skip to content

Instantly share code, notes, and snippets.

@dinvlad
Last active September 18, 2018 14:11
Show Gist options
  • Save dinvlad/0fbb6c0f27bcf0474c7a5f7a4a239df2 to your computer and use it in GitHub Desktop.
Save dinvlad/0fbb6c0f27bcf0474c7a5f7a4a239df2 to your computer and use it in GitHub Desktop.
Workaround for listening on PubSub subscriptions even when StreamingPull stops silently
// ... other dependencies ...
import com.google.api.core.ApiService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.pubsub.v1.Subscription;
import org.springframework.stereotype.Service;
@Service
public class PubSubService {
// ... other code ...
private static final long listenerErrorRetryDelayMs = 5000; // 5 seconds
private static final long listenerRestartIntervalMs = 300000; // 5 minutes
private static void sleep(final long ms) {
try {
Thread.sleep(ms);
} catch (final InterruptedException interrupted) {
// ignore
}
}
private void createSubscriberWithListener(
final Subscription subscription,
final MessageReceiver receiver
) {
while (true) {
final Thread listenerThread = Thread.currentThread();
final Subscriber subscriber = Subscriber
.newBuilder(subscription.getName(), receiver)
.build();
subscriber.addListener(
new Subscriber.Listener() {
@Override
public void failed(final Subscriber.State from, final Throwable t) {
sleep(listenerErrorRetryDelayMs);
listenerThread.interrupt();
}
},
MoreExecutors.directExecutor()
);
subscriber.startAsync().awaitRunning();
sleep(listenerRestartIntervalMs);
if (subscriber.state() == ApiService.State.RUNNING) {
subscriber.stopAsync().awaitTerminated();
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment