Skip to content

Instantly share code, notes, and snippets.

@rzeigler
Created October 4, 2020 20:19
Show Gist options
  • Save rzeigler/3ca69b4fad498af97751e7481dc4ab42 to your computer and use it in GitHub Desktop.
Save rzeigler/3ca69b4fad498af97751e7481dc4ab42 to your computer and use it in GitHub Desktop.
private def detectStall[F[_]: Concurrent: Timer, A]: Pipe[F, A, A] = {
val currentTime = Sync[F].delay(OffsetDateTime.now())
(stream: Stream[F, A]) =>
Stream.eval(currentTime).flatMap { startTime =>
Stream.eval(Ref[F].of(startTime)).flatMap { lastChunkSeen =>
val watchdog = Stream
.fixedRate(30.seconds)
.evalMap(_ => (currentTime, lastChunkSeen.get).tupled)
.ensure(new RuntimeException("pipeline appears to have stalled"))({
case (now, lastChunk) =>
now.minusSeconds(30).isAfter(lastChunk)
})
stream
.evalTapChunk(_ => currentTime.flatMap(lastChunkSeen.set))
.concurrently(watchdog)
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment