Skip to content

Instantly share code, notes, and snippets.

@gberche-orange
Created January 2, 2017 11:39
Show Gist options
  • Save gberche-orange/2d025427f99ce6257894fd206e7f4144 to your computer and use it in GitHub Desktop.
Save gberche-orange/2d025427f99ce6257894fd206e7f4144 to your computer and use it in GitHub Desktop.
private static Throwable fetchLogsOrReturnError(DopplerClient dopplerClient, String applicationId, int retry) {
Throwable dopplerException = null;
Disposable subscribedLogs = null;
try {
final CountDownLatch latch = new CountDownLatch(1);
final AtomicLong count = new AtomicLong();
final AtomicReference<Throwable> errorReference = new AtomicReference<>();
final AtomicReference<Long> lastTimeStamp = new AtomicReference<>(0L);
final AtomicReference<Long> firstTimeStamp = new AtomicReference<>(-1L);
long beforeFetchingLogsTs = System.nanoTime();
subscribedLogs = dopplerClient.recentLogs(RecentLogsRequest.builder()
.applicationId(applicationId)
.build())
.subscribe(envelope -> {
logEnvelope(envelope);
count.incrementAndGet();
Long timestamp = envelope.getTimestamp();
if (timestamp != null && timestamp > lastTimeStamp.get()) {
lastTimeStamp.set(timestamp);
}
if (timestamp != null && timestamp > 0 && firstTimeStamp.get() == -1L) {
firstTimeStamp.set(timestamp);
}
if (envelope.getLogMessage() == null) {
log.error("another type than LogMessage ?");
}
},
throwable -> {
errorReference.set(throwable);
latch.countDown();
},
latch::countDown);
long afterFetchingLogsTs = System.nanoTime();
if (!latch.await(TIMEOUT.getSeconds(), TimeUnit.SECONDS)) {
throw new IllegalStateException("Subscriber timed out");
} else {
long elapsedNs = afterFetchingLogsTs - beforeFetchingLogsTs;
long nbReadEnveloppes = count.get();
Instant lastTsInst = Instant.ofEpochSecond(0, lastTimeStamp.get());
Instant firstTsInst = Instant.ofEpochSecond(0, firstTimeStamp.get() == null ? 0L: firstTimeStamp.get());
if (errorReference.get() != null) {
dopplerException = errorReference.get();
log.debug("caught exception. retry #{}. got {} envelopes. 1st_ts={} last_ts={} elapsed={}ns Exception={}", retry, nbReadEnveloppes, firstTsInst, lastTsInst, dopplerException.toString(),
elapsedNs,
dopplerException);
} else {
long nsPerEnveloppes = nbReadEnveloppes ==0 ? 0 : elapsedNs / nbReadEnveloppes;
log.debug("got {} envelopes last_ts={} elapsed={}ns ns_per_enveloppe={}", nbReadEnveloppes, lastTsInst, elapsedNs, nsPerEnveloppes);
}
}
} catch (InterruptedException i) {
log.error("Interrupted while waiting for call result", i);
} finally {
if (subscribedLogs != null) {
subscribedLogs.dispose();
}
}
return dopplerException;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment