Last active
October 23, 2018 04:37
-
-
Save jilen/35f8d88c3b06cb602523c363e0c380f7 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
implicit class StreamSyntax[F[_], A](s: Stream[F, A])( | |
implicit F: Concurrent[F]) { | |
def groupedPipe( | |
lastQRef: Ref[F, Queue[F, Option[Chunk[A]]]], | |
n: Int): Pipe[F, A, Stream[F, A]] = { in => | |
val initQs = | |
Queue.unbounded[F, Option[Queue[F, Option[Chunk[A]]]]].flatMap { qq => | |
Queue.bounded[F, Option[Chunk[A]]](1).flatMap { q => | |
lastQRef.set(q) *> qq.enqueue1(Some(q)).as(qq -> q) | |
} | |
} | |
Stream.eval(initQs).flatMap { | |
case (qq, initQ) => | |
def newQueue = Queue.bounded[F, Option[Chunk[A]]](1).flatMap { q => | |
qq.enqueue1(Some(q)) *> lastQRef.set(q).as(q) | |
} | |
val evalStream = { | |
in.chunks | |
.evalMapAccumulate((0, initQ)) { | |
case ((i, q), c) if i + c.size >= n => | |
val (l, r) = c.splitAt(n - i) | |
q.enqueue1(Some(l)) >> q.enqueue1(None) >> q | |
.enqueue1(None) >> newQueue.flatMap { nq => | |
nq.enqueue1(Some(r)).as(((r.size, nq), c)) | |
} | |
case ((i, q), c) if (i + c.size) < n => | |
q.enqueue1(Some(c)).as(((i + c.size, q), c)) | |
} ++ Stream.eval { | |
lastQRef.get.flatMap { last => | |
last.enqueue1(None) *> last.enqueue1(None) | |
} *> qq.enqueue1(None) | |
} | |
} | |
qq.dequeue.unNoneTerminate | |
.map( | |
q => | |
q.dequeue.unNoneTerminate | |
.flatMap(Stream.chunk) | |
.onFinalize( | |
q.dequeueChunk(Int.MaxValue).unNoneTerminate.compile.drain)) | |
.concurrently(evalStream) | |
} | |
} | |
def grouped(n: Int) = { | |
Stream.eval { | |
Queue.unbounded[F, Option[Chunk[A]]].flatMap { empty => | |
Ref.of[F, Queue[F, Option[Chunk[A]]]](empty) | |
} | |
}.flatMap { ref => | |
val p = groupedPipe(ref, n) | |
s.through(p) | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Basically enqueue stream elements to queue of queue