Skip to content

Instantly share code, notes, and snippets.

Created July 2, 2019 10:24
Show Gist options
  • Save DamianReeves/4b8d65250964ffc6e56f3edf8fcd497a to your computer and use it in GitHub Desktop.
Save DamianReeves/4b8d65250964ffc6e56f3edf8fcd497a to your computer and use it in GitHub Desktop.
ZStream split
package zio.integration
package object stream extends StreamOps {
import zio._
object StreamOps extends StreamOps
trait StreamOps {
implicit class ZStreamOps[R,E,A](self:ZStream[R,E,A]) {
def split(p : A => UIO[Boolean]) : ZManaged[R, E, (ZStream[R,E,A],ZStream[R,E,A])] =
def split[B1,B2](queueCapacity : Int)(p : A => UIO[Boolean]) : ZManaged[R, E, (ZStream[R,E,A],ZStream[R,E,A])] = {
for {
leftQueue <- ZManaged.make(Queue.bounded[Take[E, A]](queueCapacity))(_.shutdown)
rightQueue <- ZManaged.make(Queue.bounded[Take[E, A]](queueCapacity))(_.shutdown)
folder = (_:Unit, item:A) => {
p(item).flatMap {
case true => leftQueue.offer(Take.Value(item)).fork.unit
case false => rightQueue.offer(Take.Value(item)).fork.unit
_ <- self.fold[R, E, A, Unit].flatMap { fold =>
fold((), _ => true, folder)
e => leftQueue.offer(Take.Fail(e)).fork.zipPar(rightQueue.offer(Take.Fail(e)).fork).unit.toManaged_,
_ => leftQueue.offer(Take.End).fork.zipPar(rightQueue.offer(Take.End).fork).unit.toManaged_
leftStream = ZStream.fromQueue[R,E,Take[E,A]](leftQueue).collectWhile {
case Take.Value(a) => a
rightStream = ZStream.fromQueue[R,E,Take[E,A]](rightQueue).collectWhile {
case Take.Value(a) => a
} yield (leftStream, rightStream)
import org.scalatest.{FlatSpec, Matchers}
import zio._
import com.typesafe.scalalogging.LazyLogging
class ZStreamSpec extends FlatSpec with Matchers with DefaultRuntime with LazyLogging {
behavior of "ZStreamOps"
it should("toQueue should work as expected") in {
val result = unsafeRun(
(for {
items <- Managed.succeed(1 to 10).map(_.toList)
stream <- Managed.succeed(Stream.fromIterable(items))
managedQueue = stream.toQueue()
managedQueue2 <- => Stream.fromQueue(queue))
} yield managedQueue2).use{ daStream =>
daStream.collectWhile {
case Take.Value(a) => a * 2
)"Result: $result")
behavior of "Splitting a ZStream"
it should("be possible to split a stream") in {
val result = unsafeRun(
(for {
items <- Managed.succeed(1 to 10).map(_.toList)
stream <- Managed.succeed(Stream.fromIterable(items))
streamz <- stream.split(n => ZIO.succeed(n % 2 == 0))
} yield streamz).use {
case (leftStream, rightStream) =>
val left =
leftStream.tap { item =>
UIO.succeed("Left: $item"))
val right =
rightStream.tap { item =>
UIO.succeed("Right: $item"))
result should contain theSameElementsAs List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment