Created
July 2, 2019 10:24
-
-
Save DamianReeves/4b8d65250964ffc6e56f3edf8fcd497a to your computer and use it in GitHub Desktop.
ZStream split
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package zio.integration | |
package object stream extends StreamOps { | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package zio.integration.stream | |
import zio._ | |
import zio.stream._ | |
object StreamOps extends StreamOps | |
trait StreamOps { | |
implicit class ZStreamOps[R,E,A](self:ZStream[R,E,A]) { | |
def split(p : A => UIO[Boolean]) : ZManaged[R, E, (ZStream[R,E,A],ZStream[R,E,A])] = | |
split(2)(p) | |
def split[B1,B2](queueCapacity : Int)(p : A => UIO[Boolean]) : ZManaged[R, E, (ZStream[R,E,A],ZStream[R,E,A])] = { | |
for { | |
leftQueue <- ZManaged.make(Queue.bounded[Take[E, A]](queueCapacity))(_.shutdown) | |
rightQueue <- ZManaged.make(Queue.bounded[Take[E, A]](queueCapacity))(_.shutdown) | |
folder = (_:Unit, item:A) => { | |
p(item).flatMap { | |
case true => leftQueue.offer(Take.Value(item)).fork.unit | |
case false => rightQueue.offer(Take.Value(item)).fork.unit | |
} | |
} | |
_ <- self.fold[R, E, A, Unit].flatMap { fold => | |
fold((), _ => true, folder) | |
.foldCauseM( | |
e => leftQueue.offer(Take.Fail(e)).fork.zipPar(rightQueue.offer(Take.Fail(e)).fork).unit.toManaged_, | |
_ => leftQueue.offer(Take.End).fork.zipPar(rightQueue.offer(Take.End).fork).unit.toManaged_ | |
) | |
.unit | |
.fork | |
} | |
leftStream = ZStream.fromQueue[R,E,Take[E,A]](leftQueue).collectWhile { | |
case Take.Value(a) => a | |
} | |
rightStream = ZStream.fromQueue[R,E,Take[E,A]](rightQueue).collectWhile { | |
case Take.Value(a) => a | |
} | |
} yield (leftStream, rightStream) | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package zio.integration.stream | |
import org.scalatest.{FlatSpec, Matchers} | |
import zio._ | |
import zio.stream._ | |
import com.typesafe.scalalogging.LazyLogging | |
class ZStreamSpec extends FlatSpec with Matchers with DefaultRuntime with LazyLogging { | |
behavior of "ZStreamOps" | |
it should("toQueue should work as expected") in { | |
val result = unsafeRun( | |
(for { | |
items <- Managed.succeed(1 to 10).map(_.toList) | |
stream <- Managed.succeed(Stream.fromIterable(items)) | |
managedQueue = stream.toQueue() | |
managedQueue2 <- managedQueue.map(queue => Stream.fromQueue(queue)) | |
} yield managedQueue2).use{ daStream => | |
daStream.collectWhile { | |
case Take.Value(a) => a * 2 | |
}.runCollect | |
} | |
) | |
logger.info(s"Result: $result") | |
} | |
behavior of "Splitting a ZStream" | |
it should("be possible to split a stream") in { | |
val result = unsafeRun( | |
(for { | |
items <- Managed.succeed(1 to 10).map(_.toList) | |
stream <- Managed.succeed(Stream.fromIterable(items)) | |
streamz <- stream.split(n => ZIO.succeed(n % 2 == 0)) | |
} yield streamz).use { | |
case (leftStream, rightStream) => | |
val left = | |
leftStream.tap { item => | |
UIO.succeed(logger.info(s"Left: $item")) | |
} | |
val right = | |
rightStream.tap { item => | |
UIO.succeed(logger.info(s"Right: $item")) | |
} | |
left.merge(right).runCollect | |
} | |
) | |
result should contain theSameElementsAs List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment