Last active
October 5, 2017 20:15
-
-
Save rtitle/894bf731d63bf2127109acfae8e75475 to your computer and use it in GitHub Desktop.
Monad Transformer Talk
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| \gMonad Transformers! | |
Rob Title | |
2017-10-05 | |
--- | |
| \gA Toy Example | |
-- | |
\*First, the object model | |
``` | |
case class Cluster(id: String, bucketId: Option[String]) | |
case class Bucket(id: String) | |
def getClusterById(id: String): Option[Cluster] = { | |
Some(Cluster("cluster", Some("bucket"))) | |
} | |
def getBucketById(id: String): Option[Bucket] = { | |
Some(Bucket("bucket")) | |
} | |
def deleteBucket(bucket: Bucket): Unit = { | |
println(s"Deleting bucket $bucket!") | |
} | |
``` | |
--- | |
| \gAttempt 1 | |
``` | |
def deleteClusterBucket(clusterId: String): Option[Bucket] = { | |
for { | |
cluster <- getClusterById(clusterId) | |
bucketId <- cluster.bucketId | |
bucket <- getBucketById(bucketId) | |
_ <- Option(deleteBucket(bucket)) | |
} yield bucket | |
} | |
deleteClusterBucket("cluster") | |
``` | |
--- | |
| \gLet's add an effect | |
``` | |
import scala.concurrent._ | |
import scala.concurrent.duration._ | |
import scala.concurrent.ExecutionContext.Implicits.global | |
def getClusterById(id: String): Future[Option[Cluster]] = { | |
Future(Some(Cluster("cluster", Some("bucket")))) | |
} | |
def getBucketById(id: String): Future[Option[Bucket]] = { | |
Future(Some(Bucket("bucket"))) | |
} | |
def deleteBucket(bucket: Bucket): Future[Unit] = { | |
Future(println(s"Deleting bucket $bucket!")) | |
} | |
``` | |
--- | |
| \gAttempt 2 | |
``` | |
// Uh oh | |
def deleteClusterBucket(clusterId: String): Option[Bucket] = { | |
for { | |
cluster <- getClusterById(clusterId) | |
bucketId <- cluster.bucketId | |
bucket <- getBucketById(bucketId) | |
_ <- Option(deleteBucket(bucket)) | |
} yield bucket | |
} | |
``` | |
-- | |
\*\rIt only unwraps 1 level deep | |
--- | |
| \g Attempt 2 Revised | |
``` | |
// Ugh | |
def deleteClusterBucket(clusterId: String): Future[Option[Bucket]] = { | |
for { | |
clusterOpt <- getClusterById(clusterId) | |
bucketIdOpt <- Future.successful(clusterOpt.flatMap(_.bucketId)) | |
bucketOpt <- bucketIdOpt match { | |
case None => Future.successful(None) | |
case Some(bucketId) => getBucketById(bucketId) | |
} | |
_ <- bucketOpt match { | |
case None => Future.successful(()) | |
case Some(bucket) => deleteBucket(bucket) | |
} | |
} yield bucketOpt | |
} | |
deleteClusterBucket("cluster") | |
``` | |
--- | |
| \gSolution: Define a Wrapper | |
``` | |
case class FutureOption[A](inner: Future[Option[A]]) | |
``` | |
--- | |
| \gAnd make it a monad | |
``` | |
import cats._, cats.implicits._, cats.data._ | |
implicit def futureOptionMonad = new Monad[FutureOption] { | |
def pure[A](a: A): FutureOption[A] = | |
FutureOption(Future.successful(Option(a))) | |
def flatMap[A, B](fa: FutureOption[A]) | |
(f: A => FutureOption[B]): FutureOption[B] = { | |
FutureOption { | |
fa.inner.flatMap { | |
case Some(a) => f(a).inner | |
case None => Future.successful(None) | |
} | |
} | |
} | |
// Ignore | |
def tailRecM[A, B](a: A) | |
(f: A => FutureOption[Either[A, B]]): FutureOption[B] = ??? | |
} | |
``` | |
--- | |
| \gAttempt 3 | |
``` | |
def deleteClusterBucket(clusterId: String): Future[Option[Bucket]] = { | |
val futureOption: FutureOption[Bucket] = for { | |
cluster <- FutureOption(getClusterById(clusterId)) | |
bucketId <- FutureOption(Future.successful(cluster.bucketId)) | |
bucket <- FutureOption(getBucketById(bucketId)) | |
_ <- FutureOption(deleteBucket(bucket).map(Option(_))) | |
} yield bucket | |
futureOption.inner | |
} | |
deleteClusterBucket("cluster") | |
``` | |
--- | |
| \gWe can generalize | |
``` | |
case class SomethingOption[F[_], A](inner: F[Option[A]]) | |
implicit def somethingOptionMonad[F[_]: Monad] = | |
new Monad[SomethingOption[F, ?]] { | |
def pure[A](a: A): SomethingOption[F, A] = | |
SomethingOption(Option(a).pure[F]) | |
def flatMap[A, B](fa: SomethingOption[F, A]) | |
(f: A => SomethingOption[F, B]): SomethingOption[F, B] = { | |
SomethingOption { | |
fa.inner.flatMap { | |
case Some(a) => f(a).inner | |
case None => none[B].pure[F] | |
} | |
} | |
} | |
// Ignore | |
def tailRecM[A, B](a: A) | |
(f: A => SomethingOption[F, Either[A, B]]): SomethingOption[F, B] = ??? | |
} | |
``` | |
-- | |
\*\rThis is just cats' OptionT! | |
--- | |
| \gAttempt 4 (final) | |
``` | |
def deleteClusterBucket(clusterId: String): Future[Option[Bucket]] = { | |
val trans: OptionT[Future, Bucket] = for { | |
cluster <- OptionT[Future, Cluster] (getClusterById(clusterId)) | |
bucketId <- OptionT.fromOption[Future] (cluster.bucketId) | |
bucket <- OptionT[Future, Bucket] (getBucketById(bucketId)) | |
_ <- OptionT.liftF[Future, Unit](deleteBucket(bucket)) | |
} yield bucket | |
trans.value | |
} | |
deleteClusterBucket("cluster") | |
``` | |
--- | |
| \gCan we generalize more? | |
``` | |
implicit def impossibleMonad[F[_]: Monad, G[_]: Monad] = | |
new Monad[Lambda[a => F[G[a]]]] { | |
// This is easy | |
def pure[A](a: A): F[G[A]] = | |
a.pure[G].pure[F] | |
/* | |
* This can't be done (you're welcome to try). | |
* No matter what you'll end up with something like F[G[F[G[B]]], | |
* which can't be flattened into F[G[B]]. | |
*/ | |
def flatMap[A, B](fga: F[G[A]])(f: A => F[G[B]]): F[G[B]] = { | |
??? | |
} | |
// Ignore | |
def tailRecM[A, B](a: A) | |
(f: A => F[G[Either[A,B]]]): F[G[B]] = ??? | |
} | |
``` | |
-- | |
\*\rMonads don't compose! | |
--- | |
| \gOther monad combinations | |
=================================== | |
Transformer Underlying | |
=================================== | |
OptionT[M[_], A] M[Option[A] | |
EitherT[M[_], E, A] M[Either[E, A]] | |
ReaderT[M[_], A, B] A => M[B] | |
StateT[M[_], S, A] S => M[(S, A)] | |
WriterT[M[_], A, B] M[(A, B)] | |
--- | |
| \g A practical example (Leonardo) | |
-- | |
\r\b1. Define a monad stack | |
``` | |
type HttpResult[A] = EitherT[IO, LeoException, A] | |
``` | |
-- | |
\r\b2. Define utilties for lifting into that type | |
``` | |
object HttpResult { | |
def pure[A](a: A): HttpResult[A] = | |
EitherT.pure(a) | |
def fromFuture[A](f: Future[A]): HttpResult[A] = | |
IO.fromFuture(always(f)).attemptT | |
.leftMap(t => InternalError(t.getMessage, t)) | |
def fromEither[A](either: Either[LeoException, A]): HttpResult[A] = | |
EitherT.fromEither(either) | |
def fromOption[A](aOpt: Option[A], error: LeoException): HttpResult[A] = | |
fromEither(aOpt.toRight(error)) | |
def raiseError[A](e: LeoException): HttpResult[A] = | |
fromEither(e.raiseError) | |
} | |
``` | |
--- | |
| \g Practical example (Leonardo) | |
\r\b3. Build sub-programs | |
``` | |
def createCluster(request: ClusterRequest): HttpResult[Cluster] = { | |
for { | |
clusterOpt <- dbRef.inTransaction { _.clusterQuery.getByName(request.clusterName) } | |
_ <- if (clusterOpt.isEmpty) HttpResult.pure(()) | |
else HttpResult.raiseError(ClusterAlreadyExistsException(request.clusterName)) | |
_ <- validateJupyterExtensionUri(request.jupyterExtension) | |
_ <- HttpResult.fromFuture(gdDAO.updateFirewallRule(request.googleProject)) | |
bucketName <- HttpResult.pure(generateUniqueBucketName(clusterRequest.clusterName)) | |
_ <- initializeBucket(bucketName, request) | |
cluster <- HttpResult.fromFuture(gdDAO.createCluster(clusterRequest)) | |
} yield cluster | |
} | |
``` | |
--- | |
| \g Practical example (Leonardo) | |
\r\b4. Finally, commit the http response | |
``` | |
def marshalHttpResult[A](httpResult: HttpResult[A]) | |
: IO[ToResponseMarshallable] = { | |
httpResult.fold ( | |
{ leoEx => leoEx.statusCode -> leoEx.toErrorReport }, | |
{ a => StatusCodes.OK -> a } | |
) | |
} | |
// in your route: | |
complete { | |
val httpResult: HttpResult[Cluster] = | |
leonardoService.createCluster(clusterRequest) | |
marshalHttpResult(httpResult).unsafeToFuture() | |
} | |
``` | |
--- | |
| \g Problems with monad transformers | |
-- | |
\*Larger stacks become unwieldy | |
``` | |
import cats.effect._ | |
trait Repository { | |
def getClusterById(id: String): IO[Option[Cluster]] = { | |
IO(Some(Cluster("cluster", Some("bucket")))) | |
} | |
def getBucketById(id: String): IO[Option[Bucket]] = { | |
IO(Some(Bucket("bucket"))) | |
} | |
def deleteBucket(bucket: Bucket): IO[Unit] = { | |
IO(println(s"Deleting bucket $bucket!")) | |
} | |
} | |
object MyRepository extends Repository | |
type Stack[A] = | |
ReaderT[OptionT[IO, ?], Repository, A] | |
``` | |
--- | |
| \g Lifting is hard | |
``` | |
type Stack[A] = ReaderT[OptionT[IO, ?], Repository, A] | |
def deleteClusterBucket(clusterId: String): Stack[Bucket] = { | |
for { | |
cluster <- ReaderT[OptionT[IO, ?], Repository, Cluster]( | |
(r: Repository) => OptionT(r.getClusterById(clusterId))) | |
bucketId <- ReaderT[OptionT[IO, ?], Repository, String]( | |
(_: Repository) => OptionT.fromOption(cluster.bucketId)) | |
bucket <- ReaderT[OptionT[IO, ?], Repository, Bucket]( | |
(r: Repository) => OptionT(r.getBucketById(bucketId))) | |
_ <- ReaderT[OptionT[IO, ?], Repository, Unit]( | |
(r: Repository) => OptionT.liftF(r.deleteBucket(bucket))) | |
} yield bucket | |
} | |
deleteClusterBucket("cluster").run(MyRepository).value.unsafeRunSync | |
``` | |
--- | |
| \g Improvements | |
\* https://github.com/typelevel/cats-mtl | |
\* mtl = monad transformer library | |
``` | |
import cats.mtl._ | |
type Stack[A] = OptionT[IO, A] | |
def deleteClusterBucket(clusterId: String) | |
(implicit r: ApplicativeAsk[Stack, Repository]): Stack[Bucket] = | |
for { | |
r <- r.ask | |
cluster <- OptionT(r.getClusterById(clusterId)) | |
bucketId <- OptionT.fromOption[IO](cluster.bucketId) | |
bucket <- OptionT(r.getBucketById(bucketId)) | |
_ <- OptionT.liftF[IO, Unit](r.deleteBucket(bucket)) | |
} yield bucket | |
// I'm confused about how to provide the ApplicativeAsk instance | |
deleteClusterBucket("cluster") | |
``` | |
--- | |
| \g Eff | |
\* https://github.com/atnos-org/eff | |
\* eff = extensible effects | |
\* "eff is a monad for abstracting over a free coproduct of effects" | |
-- | |
``` | |
// Eff seems to support Futures but not IO, so we're going back to futures | |
trait Repository { | |
def getClusterById(id: String): Future[Option[Cluster]] = { | |
Future(Some(Cluster("cluster", Some("bucket")))) | |
} | |
def getBucketById(id: String): Future[Option[Bucket]] = { | |
Future(Some(Bucket("bucket"))) | |
} | |
def deleteBucket(bucket: Bucket): Future[Unit] = { | |
Future(println(s"Deleting bucket $bucket!")) | |
} | |
} | |
object MyRepository extends Repository | |
``` | |
--- | |
| \g Eff (continued) | |
``` | |
import org.atnos.eff._ | |
import org.atnos.eff.all._ | |
import org.atnos.eff.future._ | |
import org.atnos.eff.syntax.all._ | |
type ReaderRepo[A] = Reader[Repository, A] | |
type Stack = Fx.fx3[ReaderRepo, TimedFuture, Option] | |
type _readerRepo[R] = ReaderRepo |= R | |
def deleteClusterBucket[R :_readerRepo :_future :_option] | |
(clusterId: String): Eff[R, Bucket] = { | |
for { | |
repo <- ask[R, Repository] | |
clusterOpt <- fromFuture(repo.getClusterById(clusterId)) | |
cluster <- fromOption(clusterOpt) | |
bucketId <- fromOption(cluster.bucketId) | |
bucketOpt <- fromFuture(repo.getBucketById(bucketId)) | |
bucket <- fromOption(bucketOpt) | |
_ <- fromFuture(repo.deleteBucket(bucket)) | |
} yield bucket | |
} | |
// WTF doesn't this compile | |
deleteClusterBucket[Stack]("cluster").runReader(MyRepository) | |
``` | |
--- | |
| \g Emm | |
\* https://github.com/djspiewak/emm | |
\* "emm composes monads" | |
\* requires Traverse | |
-- | |
``` | |
implicit def notSoImpossibleMonad[F[_]: Monad, G[_]: Monad: Traverse] = | |
new Monad[Lambda[a => F[G[a]]]] { | |
def flatMap[A, B](fga: F[G[A]])(f: A => F[G[B]]): F[G[B]] = { | |
// F[G[F[G[B]]]] via map/flatMap | |
// F[F[G[G[B]]]] via traverse | |
// F[G[B]] via flatten | |
} | |
} | |
``` | |
-- | |
\*\r Not a lawful monad! Don't use this for real work. | |
--- | |
| \g Emm usage | |
``` | |
import emm._ | |
import emm.compat.cats._ | |
type E = Reader |: IO |: Option |: Base | |
// note quite correct | |
def deleteClusterBucket(clusterId: String): Emm[E, Bucket] = { | |
for { | |
r <- getRepo.liftM[E] | |
cluster <- r.getClusterById(clusterId).liftM[E] | |
bucketId <- cluster.bucketId.liftM[E] | |
bucket <- r.getBucketById(bucketId).liftM[E] | |
_ <- r.deleteBucket(bucket).liftM[E] | |
} yield bucket | |
} | |
deleteClusterBucket("cluster").run | |
``` | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
To actually run this presentation:
n
to advance slides,p
to go back, and!!
to compile the current slide.