Skip to content

Instantly share code, notes, and snippets.

@DomBlack
Created August 17, 2015 08:20
Show Gist options
  • Save DomBlack/568752041e4ec0cecc18 to your computer and use it in GitHub Desktop.
Save DomBlack/568752041e4ec0cecc18 to your computer and use it in GitHub Desktop.
Generic Akka stream operations for carrying extra information around operations
import akka.stream.scaladsl.Flow
import scala.concurrent.{ ExecutionContext, Future }
import scalaz._
import scalaz.std.scalaFuture.futureInstance // IntelliJ lies
/**
* Flowz provides Akka Stream Flow like methods for monadic types.
*
* You can define the `M` type and the input type, and let the compiler infer the output
* type:
* ```
* val process = Flowz[Option, Int].map(_ * 2 toString) // Outputs Flow[Option[Int], Option[String]]
*
* source ~> process ~> sink
* ```
*
* @tparam M The container type
* @tparam A The inner type
*/
class Flowz[M[_], A] private () {
/** Map the content of `M` using `f` */
def map[B](f: A ⇒ B)(implicit ev: Functor[M]): Flow[M[A], M[B], Unit] =
Flow[M[A]].map(ev.lift(f))
/** Maps asynchronously the contents of `M` using `f` */
def mapAsync[B](parallelism: Int)(f: A ⇒ Future[B]) // base params
(implicit ev: Traverse[M], ex: ExecutionContext): Flow[M[A], M[B], Unit] =
Flow[M[A]].mapAsync(parallelism)(ev.traverse(_)(f))
/** Maps asynchronously the contents of `M` using `f` */
def mapAsyncUnordered[B](parallelism: Int)(f: A ⇒ Future[B]) // base params
(implicit ev: Traverse[M], ex: ExecutionContext): Flow[M[A], M[B], Unit] =
Flow[M[A]].mapAsyncUnordered(parallelism)(ev.traverse(_)(f))
/** Given a value and a function both in an `M` container, applies the function to the value */
def ap[B](f: ⇒ M[A ⇒ B])(implicit ev: Apply[M]): Flow[M[A], M[B], Unit] =
Flow[M[A]].map(ma ⇒ ev.ap(ma)(f))
/** Given a value, creates a containing `M` containing that value */
def point(implicit ev: Applicative[M]): Flow[A, M[A], Unit] =
Flow[A].map(a ⇒ ev.point(a))
/** Flat maps `f` on the value */
def bind[B](f: A ⇒ M[B])(implicit ev: Bind[M]): Flow[M[A], M[B], Unit] =
Flow[M[A]].map(ma ⇒ ev.bind(ma)(f))
/** Maps `f` on the value and returns `B` as the output stream (loosing the `M` container) */
def foldMap[B](f: A ⇒ B)(implicit ev: Foldable[M], F: Monoid[B]): Flow[M[A], B, Unit] =
Flow[M[A]].map(ma ⇒ ev.foldMap(ma)(f))
/** Folds left over `M` and returns the accumulator `B` */
def foldLeft[B](z: B)(f: (B, A) ⇒ B)(implicit ev: Foldable[M]): Flow[M[A], B, Unit] =
Flow[M[A]].map(ma ⇒ ev.foldLeft(ma, z)(f))
/** Folds right over `M` and returns the accumulator `B` */
def foldRight[B](z: B)(f: (A, ⇒ B) ⇒ B)(implicit ev: Foldable[M]): Flow[M[A], B, Unit] =
Flow[M[A]].map(ma ⇒ ev.foldRight(ma, z)(f))
/**
* Applies `f` on the value which returns `G[B]`, and then swaps the `M` & `G` around to return
* a stream of `G[M[B]]`
*/
def traverse[G[_], B](f: A ⇒ G[B])(implicit ev: Traverse[M], G: Applicative[G]): Flow[M[A], G[M[B]], Unit] = // scalastyle:ignore
Flow[M[A]].map(ma ⇒ ev.traverse(ma)(f))
/** Given a stream of `M[G[B]]` will swap the `M` & `G` over to return a stream of `G[M[B]]` */
def sequence[G[_]](implicit ev: Traverse[M], G: Applicative[G]): Flow[M[G[A]], G[M[A]], Unit] =
Flow[M[G[A]]].map(mga ⇒ ev.sequence(mga))
// <editor-fold desc="aliases">
/** Alias for [[bind]] */
@inline
def flatMap[B](f: A ⇒ M[B])(implicit ev: Bind[M]): Flow[M[A], M[B], Unit] = bind(f)
// </editor-fold>
}
object Flowz {
/** Creates a [[Flowz]] instance for the type `M[A]` */
def apply[M[_], A]: Flowz[M, A] = new Flowz[M, A]()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment