Skip to content

Instantly share code, notes, and snippets.

@derekjw
Last active February 28, 2018 11:24
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save derekjw/8ccab15a78193e18eafcef235216e01d to your computer and use it in GitHub Desktop.
Save derekjw/8ccab15a78193e18eafcef235216e01d to your computer and use it in GitHub Desktop.
Porting some akka-stream combinators to fs2
import fs2._
import scala.concurrent.ExecutionContext.Implicits.global
Scheduler[IO](2)
.flatMap { scheduler =>
Stream.range(0, 96)
.covary[IO]
.through(flatMapAsync[IO, Int, Int](16)(n => scheduler.delay[IO, Int](Stream.emit(n), Random.nextInt(1000).millis)))
}
.onFinalize(IO(println("done")))
.through(groupBy[IO, Int, (Int, Int), Int](_ % 16)(k => _.scan((k, 0))((acc, n) => (acc._1, acc._2 + n))))
.zipWithIndex
.observe1(n => IO(println(n)))
.compile
.drain
.unsafeRunSync()
def flatMapAsync[F[_]: Effect, I, O](parallelism: Int)(f: I => Stream[F, O])(implicit executionContext: ExecutionContext): Pipe[F, I, O] =
_.map(i => f(i)).join(parallelism)
def mapEvalAsync[F[_]: Effect, I, O](parallelism: Int)(f: I => F[O])(implicit executionContext: ExecutionContext): Pipe[F, I, O] =
flatMapAsync(parallelism)(f.andThen(Stream.eval))
def groupBy[F[_]: Effect, I, O, K](f: I => K, groupBuffer: Int = 16)(subStream: K => Pipe[F, I, O])(implicit executionContext: ExecutionContext): Pipe[F, I, O] =
_.noneTerminate
.through {
evalMapAccumulate[F, Map[K, Queue[F, Option[I]]], Option[I], Option[Stream[F, O]]](Effect[F].pure(Map.empty[K, Queue[F, Option[I]]])) {
case (queues, someN@Some(n)) =>
val partition = f(n)
queues.get(partition).map { queue =>
queue.enqueue1(someN).map(_ => (queues, Option.empty[Stream[F, O]]))
}.getOrElse {
Queue.bounded[F, Option[I]](groupBuffer).flatMap { queue =>
queue.enqueue1(someN).map { _ =>
val result = Option(queue.dequeueAvailable.unNoneTerminate.through(subStream(partition)))
(queues + (partition -> queue), result)
}
}
}
case (queues, None) =>
queues.valuesIterator.foldLeft(Effect[F].unit)((e, q) => e.flatMap(_ => q.enqueue1(None))).map(_ => (queues, Option.empty[Stream[F, O]]))
}
}
.map(_._2)
.unNone
.joinUnbounded
def evalMapAccumulate[F[_]: Monad, S, I, O](init: F[S])(f: (S, I) => F[(S, O)]): Pipe[F, I, (S, O)] = {
def next(init: S, stream: Stream[F, I]): Pull[F, (S, O), Unit] = {
stream.pull.uncons1.flatMap {
case Some((i, rest)) =>
Pull.eval(f(init, i)).flatMap { so =>
Pull.output1(so) >> next(so._1, rest)
}
case None => Pull.done
}
}
next(init, _).stream
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment