Skip to content

Instantly share code, notes, and snippets.

@chuwy
Created Jun 18, 2020
Embed
What would you like to do?
def input[F[_]: Timer] =
(Stream(1, 2, -3, 4, -5) ++
Stream.sleep_(1.second) ++
Stream(4, 3, -1, -2, 1) ++
Stream.sleep_(1.second) ++
Stream(0, 2, -5) ++
Stream.sleep_(1.second) ++
Stream(-6, 1, 2, 0, -1, -4, 2, 9)
).covary[F].map(i => if (i % 2 == 0) i.asRight else (i.toString + "b").asLeft)
def goodSink[F[_]: Sync](queue: Queue[F, String]): Pipe[F, Int, Unit] =
_.evalMap { int =>
if (int < 0) Sync[F].delay(println(s"Good $int"))
else queue.enqueue1(int.toString + "q")
}
def badSink[F[_]: Concurrent](queue: Queue[F, String]): Pipe[F, String, Unit] =
_.merge(queue.dequeue).evalMap { item => Sync[F].delay(println(s"Bad $item")) }
val action = for {
q <- Queue.bounded[IO, String](5)
_ <- input[IO].observeEither(badSink(q), goodSink(q)).compile.drain
} yield ()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment