Created
January 2, 2017 11:39
-
-
Save gberche-orange/2d025427f99ce6257894fd206e7f4144 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
private static Throwable fetchLogsOrReturnError(DopplerClient dopplerClient, String applicationId, int retry) { | |
Throwable dopplerException = null; | |
Disposable subscribedLogs = null; | |
try { | |
final CountDownLatch latch = new CountDownLatch(1); | |
final AtomicLong count = new AtomicLong(); | |
final AtomicReference<Throwable> errorReference = new AtomicReference<>(); | |
final AtomicReference<Long> lastTimeStamp = new AtomicReference<>(0L); | |
final AtomicReference<Long> firstTimeStamp = new AtomicReference<>(-1L); | |
long beforeFetchingLogsTs = System.nanoTime(); | |
subscribedLogs = dopplerClient.recentLogs(RecentLogsRequest.builder() | |
.applicationId(applicationId) | |
.build()) | |
.subscribe(envelope -> { | |
logEnvelope(envelope); | |
count.incrementAndGet(); | |
Long timestamp = envelope.getTimestamp(); | |
if (timestamp != null && timestamp > lastTimeStamp.get()) { | |
lastTimeStamp.set(timestamp); | |
} | |
if (timestamp != null && timestamp > 0 && firstTimeStamp.get() == -1L) { | |
firstTimeStamp.set(timestamp); | |
} | |
if (envelope.getLogMessage() == null) { | |
log.error("another type than LogMessage ?"); | |
} | |
}, | |
throwable -> { | |
errorReference.set(throwable); | |
latch.countDown(); | |
}, | |
latch::countDown); | |
long afterFetchingLogsTs = System.nanoTime(); | |
if (!latch.await(TIMEOUT.getSeconds(), TimeUnit.SECONDS)) { | |
throw new IllegalStateException("Subscriber timed out"); | |
} else { | |
long elapsedNs = afterFetchingLogsTs - beforeFetchingLogsTs; | |
long nbReadEnveloppes = count.get(); | |
Instant lastTsInst = Instant.ofEpochSecond(0, lastTimeStamp.get()); | |
Instant firstTsInst = Instant.ofEpochSecond(0, firstTimeStamp.get() == null ? 0L: firstTimeStamp.get()); | |
if (errorReference.get() != null) { | |
dopplerException = errorReference.get(); | |
log.debug("caught exception. retry #{}. got {} envelopes. 1st_ts={} last_ts={} elapsed={}ns Exception={}", retry, nbReadEnveloppes, firstTsInst, lastTsInst, dopplerException.toString(), | |
elapsedNs, | |
dopplerException); | |
} else { | |
long nsPerEnveloppes = nbReadEnveloppes ==0 ? 0 : elapsedNs / nbReadEnveloppes; | |
log.debug("got {} envelopes last_ts={} elapsed={}ns ns_per_enveloppe={}", nbReadEnveloppes, lastTsInst, elapsedNs, nsPerEnveloppes); | |
} | |
} | |
} catch (InterruptedException i) { | |
log.error("Interrupted while waiting for call result", i); | |
} finally { | |
if (subscribedLogs != null) { | |
subscribedLogs.dispose(); | |
} | |
} | |
return dopplerException; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment