Skip to content

Instantly share code, notes, and snippets.

@guidoschmidt17
Last active December 17, 2022 11:15
Show Gist options
  • Save guidoschmidt17/00c68e2dea07a9a6296332f9fb20d1db to your computer and use it in GitHub Desktop.
Save guidoschmidt17/00c68e2dea07a9a6296332f9fb20d1db to your computer and use it in GitHub Desktop.
...
def loopResult(out: Queue[Take[StreamingError, B]], offsetRef: Ref[Long], n: Long) =
def resultStream(offset: Long, limit: Long) =
sessionPool
.use(session => {
session
.prepare(query)
.toScopedZIO
.flatMap(query =>
ZIO
.succeed(
query
.stream(args ~ offset ~ limit, chunksize)
.chunkLimit(chunksize)
.evalTap(b => offsetRef.update(_ + b.size) *> out.offer(Take.chunk(Chunk.from(b.toList))))
++ fs2.Stream.eval(out.offer(Take.end))
)
)
})
for
offset <- offsetRef.get
result <- resultStream(offset, n - offset)
_ <- result
.handleErrorWith(e => fs2.Stream.eval(ZIO.fail(e)).drain)
.compile[RIO[Any, _], RIO[Any, _], Any]
.resource
.drain
.toScopedZIO
yield ()
val result =
for
maxcount <- if check.isDefined then checkResult else ZIO.succeed(Long.MaxValue)
n = math.min(maxcount, if limit < 1 then Long.MaxValue else limit)
offset: Ref[Long] <- Ref.make(0L)
out: Queue[Take[StreamingError, B]] <- Queue.bounded(chunksize)
outstream = ZStream
.fromQueue(out, out.capacity)
.flattenTake
_ <- loopResult(out, offset, n)
.retry(DefaultRetrySchedule)
.forkScoped
yield outstream
result.catchAll(catchAllErrors(message))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment