Skip to content

Instantly share code, notes, and snippets.

@kubukoz
Created March 15, 2021 17:39
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save kubukoz/4fd3285d6ca1063ce4cf3a072afaa9c2 to your computer and use it in GitHub Desktop.
Save kubukoz/4fd3285d6ca1063ce4cf3a072afaa9c2 to your computer and use it in GitHub Desktop.
Periodically show the amount of elements produced by an fs2 stream
def showProcessedCount[F[_]: Concurrent: Timer, A]: Pipe[F, A, A] = stream =>
fs2.Stream.eval(Ref[F].of(0)).flatMap { count =>
stream.chunks
.evalMap { chunk =>
count.update(_ + chunk.size).as(chunk)
}
.flatMap(fs2.Stream.chunk)
.concurrently(
fs2.Stream
.awakeEvery[F](1.second)
.zip(fs2.Stream.repeatEval(count.get))
.map { case (dur, count) =>
s"Processed $count elements in ${dur.toSeconds} seconds"
}
.showLinesStdOut
)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment