Skip to content

Instantly share code, notes, and snippets.

@DamianReeves
Created July 2, 2019 10:24
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save DamianReeves/4b8d65250964ffc6e56f3edf8fcd497a to your computer and use it in GitHub Desktop.
Save DamianReeves/4b8d65250964ffc6e56f3edf8fcd497a to your computer and use it in GitHub Desktop.
ZStream split
package zio.integration
package object stream extends StreamOps {
}
package zio.integration.stream
import zio._
import zio.stream._
object StreamOps extends StreamOps
trait StreamOps {
implicit class ZStreamOps[R,E,A](self:ZStream[R,E,A]) {
def split(p : A => UIO[Boolean]) : ZManaged[R, E, (ZStream[R,E,A],ZStream[R,E,A])] =
split(2)(p)
def split[B1,B2](queueCapacity : Int)(p : A => UIO[Boolean]) : ZManaged[R, E, (ZStream[R,E,A],ZStream[R,E,A])] = {
for {
leftQueue <- ZManaged.make(Queue.bounded[Take[E, A]](queueCapacity))(_.shutdown)
rightQueue <- ZManaged.make(Queue.bounded[Take[E, A]](queueCapacity))(_.shutdown)
folder = (_:Unit, item:A) => {
p(item).flatMap {
case true => leftQueue.offer(Take.Value(item)).fork.unit
case false => rightQueue.offer(Take.Value(item)).fork.unit
}
}
_ <- self.fold[R, E, A, Unit].flatMap { fold =>
fold((), _ => true, folder)
.foldCauseM(
e => leftQueue.offer(Take.Fail(e)).fork.zipPar(rightQueue.offer(Take.Fail(e)).fork).unit.toManaged_,
_ => leftQueue.offer(Take.End).fork.zipPar(rightQueue.offer(Take.End).fork).unit.toManaged_
)
.unit
.fork
}
leftStream = ZStream.fromQueue[R,E,Take[E,A]](leftQueue).collectWhile {
case Take.Value(a) => a
}
rightStream = ZStream.fromQueue[R,E,Take[E,A]](rightQueue).collectWhile {
case Take.Value(a) => a
}
} yield (leftStream, rightStream)
}
}
}
package zio.integration.stream
import org.scalatest.{FlatSpec, Matchers}
import zio._
import zio.stream._
import com.typesafe.scalalogging.LazyLogging
class ZStreamSpec extends FlatSpec with Matchers with DefaultRuntime with LazyLogging {
behavior of "ZStreamOps"
it should("toQueue should work as expected") in {
val result = unsafeRun(
(for {
items <- Managed.succeed(1 to 10).map(_.toList)
stream <- Managed.succeed(Stream.fromIterable(items))
managedQueue = stream.toQueue()
managedQueue2 <- managedQueue.map(queue => Stream.fromQueue(queue))
} yield managedQueue2).use{ daStream =>
daStream.collectWhile {
case Take.Value(a) => a * 2
}.runCollect
}
)
logger.info(s"Result: $result")
}
behavior of "Splitting a ZStream"
it should("be possible to split a stream") in {
val result = unsafeRun(
(for {
items <- Managed.succeed(1 to 10).map(_.toList)
stream <- Managed.succeed(Stream.fromIterable(items))
streamz <- stream.split(n => ZIO.succeed(n % 2 == 0))
} yield streamz).use {
case (leftStream, rightStream) =>
val left =
leftStream.tap { item =>
UIO.succeed(logger.info(s"Left: $item"))
}
val right =
rightStream.tap { item =>
UIO.succeed(logger.info(s"Right: $item"))
}
left.merge(right).runCollect
}
)
result should contain theSameElementsAs List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment