Created
November 4, 2016 10:57
-
-
Save jdesiloniz/236bee9a2d22b6b3f12db1c7399b5b95 to your computer and use it in GitHub Desktop.
sdasdasdadsa
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import cats.data.NonEmptyList | |
import fetch._ | |
trait DataSource[Identity, Result]{ | |
def fetchOne(id: Identity): Query[Option[Result]] | |
def fetchMany(ids: NonEmptyList[Identity]): Query[Map[Identity, Result]] | |
} | |
type UserId = Int | |
case class User(id: UserId, username: String) | |
def latency[A](result: A, msg: String) = { | |
val id = Thread.currentThread.getId | |
println(s"~~> [$id] $msg") | |
Thread.sleep(100) | |
println(s"<~~ [$id] $msg") | |
result | |
} | |
import cats.data.NonEmptyList | |
import cats.std.list._ | |
import fetch._ | |
val userDatabase: Map[UserId, User] = Map( | |
1 -> User(1, "@one"), | |
2 -> User(2, "@two"), | |
3 -> User(3, "@three"), | |
4 -> User(4, "@four") | |
) | |
implicit object UserSource extends DataSource[UserId, User]{ | |
override def fetchOne(id: UserId): Query[Option[User]] = { | |
Query.sync({ | |
latency(userDatabase.get(id), s"One User $id") | |
}) | |
} | |
override def fetchMany(ids: NonEmptyList[UserId]): Query[Map[UserId, User]] = { | |
Query.sync({ | |
latency(userDatabase.filterKeys(ids.unwrap.contains), s"Many Users $ids") | |
}) | |
} | |
} | |
def getUser(id: UserId): Fetch[User] = Fetch(id) // or, more explicitly: Fetch(id)(UserSource) | |
implicit object UnbatchedSource extends DataSource[Int, Int]{ | |
override def fetchOne(id: Int): Query[Option[Int]] = { | |
Query.sync(Option(id)) | |
} | |
override def fetchMany(ids: NonEmptyList[Int]): Query[Map[Int, Int]] = { | |
batchingNotSupported(ids) | |
} | |
} | |
val fetchUser: Fetch[User] = getUser(1) | |
import cats.Id | |
import fetch.unsafe.implicits._ | |
import fetch.syntax._ | |
fetchUser.runA[Id] | |
// ~~> [92] One User 1 | |
// <~~ [92] One User 1 | |
// res3: cats.Id[User] = User(1,@one) | |
val fetchTwoUsers: Fetch[(User, User)] = for { | |
aUser <- getUser(1) | |
anotherUser <- getUser(aUser.id + 1) | |
} yield (aUser, anotherUser) | |
fetchTwoUsers.runA[Id] | |
// ~~> [92] One User 1 | |
// <~~ [92] One User 1 | |
// ~~> [92] One User 2 | |
// <~~ [92] One User 2 | |
// res4: cats.Id[(User, User)] = (User(1,@one),User(2,@two)) | |
import cats.syntax.cartesian._ | |
val fetchProduct: Fetch[(User, User)] = getUser(1).product(getUser(2)) | |
fetchProduct.runA[Id] | |
// ~~> [92] Many Users OneAnd(1,List(2)) | |
// <~~ [92] Many Users OneAnd(1,List(2)) | |
// res6: cats.Id[(User, User)] = (User(1,@one),User(2,@two)) | |
val fetchDuped: Fetch[(User, User)] = getUser(1).product(getUser(1)) | |
fetchDuped.runA[Id] | |
// ~~> [92] One User 1 | |
// <~~ [92] One User 1 | |
// res7: cats.Id[(User, User)] = (User(1,@one),User(1,@one)) | |
val fetchCached: Fetch[(User, User)] = for { | |
aUser <- getUser(1) | |
anotherUser <- getUser(1) | |
} yield (aUser, anotherUser) | |
fetchCached.runA[Id] | |
// ~~> [92] One User 1 | |
// <~~ [92] One User 1 | |
// res8: cats.Id[(User, User)] = (User(1,@one),User(1,@one)) | |
Query.sync(42) | |
// res9: fetch.Query[Int] = Sync(cats.Later@52caf59b) | |
Query.sync({ println("Computing 42"); 42 }) | |
// res10: fetch.Query[Int] = Sync(cats.Later@7ffb6457) | |
import cats.Eval | |
// import cats.Eval | |
Query.eval(Eval.always({ println("Computing 42"); 42 })) | |
// res11: fetch.Query[Int] = Sync(cats.Always@3e6bbcd5) | |
Query.async((ok: (Int => Unit), fail) => { | |
Thread.sleep(100) | |
ok(42) | |
}) | |
// res12: fetch.Query[Int] = Async(<function2>,Duration.Inf) | |
type PostId = Int | |
case class Post(id: PostId, author: UserId, content: String) | |
val postDatabase: Map[PostId, Post] = Map( | |
1 -> Post(1, 2, "An article"), | |
2 -> Post(2, 3, "Another article"), | |
3 -> Post(3, 4, "Yet another article") | |
) | |
implicit object PostSource extends DataSource[PostId, Post]{ | |
override def fetchOne(id: PostId): Query[Option[Post]] = { | |
Query.sync({ | |
latency(postDatabase.get(id), s"One Post $id") | |
}) | |
} | |
override def fetchMany(ids: NonEmptyList[PostId]): Query[Map[PostId, Post]] = { | |
Query.sync({ | |
latency(postDatabase.filterKeys(ids.unwrap.contains), s"Many Posts $ids") | |
}) | |
} | |
} | |
def getPost(id: PostId): Fetch[Post] = Fetch(id) | |
def getAuthor(p: Post): Fetch[User] = Fetch(p.author) | |
type PostTopic = String | |
implicit object PostTopicSource extends DataSource[Post, PostTopic]{ | |
override def fetchOne(id: Post): Query[Option[PostTopic]] = { | |
Query.sync({ | |
val topic = if (id.id % 2 == 0) "monad" else "applicative" | |
latency(Option(topic), s"One Post Topic $id") | |
}) | |
} | |
override def fetchMany(ids: NonEmptyList[Post]): Query[Map[Post, PostTopic]] = { | |
Query.sync({ | |
val result = ids.unwrap.map(id => (id, if (id.id % 2 == 0) "monad" else "applicative")).toMap | |
latency(result, s"Many Post Topics $ids") | |
}) | |
} | |
} | |
def getPostTopic(post: Post): Fetch[PostTopic] = Fetch(post) | |
val fetchMulti: Fetch[(Post, PostTopic)] = for { | |
post <- getPost(1) | |
topic <- getPostTopic(post) | |
} yield (post, topic) | |
fetchMulti.runA[Id] | |
// ~~> [92] One Post 1 | |
// <~~ [92] One Post 1 | |
// ~~> [92] One Post Topic Post(1,2,An article) | |
// <~~ [92] One Post Topic Post(1,2,An article) | |
// res16: cats.Id[(Post, PostTopic)] = (Post(1,2,An article),applicative) | |
val fetchConcurrent: Fetch[(Post, User)] = getPost(1).product(getUser(2)) | |
fetchConcurrent.runA[Id] | |
// ~~> [92] One Post 1 | |
// <~~ [92] One Post 1 | |
// ~~> [92] One User 2 | |
// <~~ [92] One User 2 | |
// res17: cats.Id[(Post, User)] = (Post(1,2,An article),User(2,@two)) | |
import scala.concurrent._ | |
import ExecutionContext.Implicits.global | |
import scala.concurrent.duration._ | |
import fetch.implicits._ | |
// import fetch.implicits._ | |
Await.result(fetchConcurrent.runA[Future], Duration.Inf) | |
// ~~> [93] One Post 1 | |
// ~~> [75] One User 2 | |
// <~~ [93] One Post 1 | |
// <~~ [75] One User 2 | |
// res18: (Post, User) = (Post(1,2,An article),User(2,@two)) | |
import cats.std.list._ | |
import cats.syntax.traverse._ | |
val fetchSequence: Fetch[List[User]] = List(getUser(1), getUser(2), getUser(3)).sequence | |
fetchSequence.runA[Id] | |
// ~~> [92] Many Users OneAnd(1,List(2, 3)) | |
// <~~ [92] Many Users OneAnd(1,List(2, 3)) | |
// res20: cats.Id[List[User]] = List(User(1,@one), User(2,@two), User(3,@three)) | |
val fetchTraverse: Fetch[List[User]] = List(1, 2, 3).traverse(getUser) | |
fetchTraverse.runA[Id] | |
// ~~> [92] Many Users OneAnd(1,List(2, 3)) | |
// <~~ [92] Many Users OneAnd(1,List(2, 3)) | |
// res21: cats.Id[List[User]] = List(User(1,@one), User(2,@two), User(3,@three)) | |
val cache = InMemoryCache(UserSource.identity(1) -> User(1, "@dialelo")) | |
// cache: fetch.InMemoryCache = InMemoryCache(Map((UserSource$@36695a49,1) -> User(1,@dialelo))) | |
Fetch.run[Id](fetchUser, cache) | |
// res22: cats.Id[User] = User(1,@dialelo) | |
fetchUser.runA[Id](cache) | |
// res23: cats.Id[User] = User(1,@dialelo) | |
val fetchManyUsers: Fetch[List[User]] = List(1, 2, 3).traverse(getUser) | |
fetchManyUsers.runA[Id](cache) | |
// ~~> [92] Many Users OneAnd(2,List(3)) | |
// <~~ [92] Many Users OneAnd(2,List(3)) | |
// res24: cats.Id[List[User]] = List(User(1,@dialelo), User(2,@two), User(3,@three)) | |
val env = fetchManyUsers.runE[Id] | |
// ~~> [92] Many Users OneAnd(1,List(2, 3)) | |
// <~~ [92] Many Users OneAnd(1,List(2, 3)) | |
// env: cats.Id[fetch.FetchEnv] = FetchEnv(InMemoryCache(Map((UserSource$@36695a49,1) -> User(1,@one), (UserSource$@36695a49,2) -> User(2,@two), (UserSource$@36695a49,3) -> User(3,@three))),List(),Queue(Round(InMemoryCache(Map()),Concurrent,ConcurrentRound(Map(UserSource$@36695a49 -> List(1, 2, 3))),95471526461769,95471630826900,false))) | |
fetchManyUsers.runA[Id](env.cache) | |
// res25: cats.Id[List[User]] = List(User(1,@one), User(2,@two), User(3,@three)) | |
trait DataSourceCache { | |
def update[A](k: DataSourceIdentity, v: A): DataSourceCache | |
def get[A](k: DataSourceIdentity): Option[A] | |
} | |
final case class ForgetfulCache() extends DataSourceCache { | |
override def get[A](k: DataSourceIdentity): Option[A] = None | |
override def update[A](k: DataSourceIdentity, v: A): ForgetfulCache = this | |
} | |
val fetchSameTwice: Fetch[(User, User)] = for { | |
one <- getUser(1) | |
another <- getUser(1) | |
} yield (one, another) | |
// fetchSameTwice: fetch.Fetch[(User, User)] = Gosub(Suspend(FetchOne(1,UserSource$@36695a49)),<function1>) | |
fetchSameTwice.runA[Id](ForgetfulCache()) | |
// ~~> [92] One User 1 | |
// <~~ [92] One User 1 | |
// ~~> [92] One User 1 | |
// <~~ [92] One User 1 | |
// res26: cats.Id[(User, User)] = (User(1,@one),User(1,@one)) | |
val fetchError: Fetch[User] = (new Exception("Oh noes")).fetch | |
scala> fetchError.runA[Id] | |
java.lang.Exception: Oh noes | |
... 1024 elided | |
import fetch.unsafe.implicits._ | |
import cats.Eval | |
// import cats.Eval | |
import cats.data.Xor | |
// import cats.data.Xor | |
val safeResult: Eval[Throwable Xor User] = FetchMonadError[Eval].attempt(fetchError.runA[Eval]) | |
// safeResult: cats.Eval[cats.data.Xor[Throwable,User]] = cats.Later@78cef96d | |
safeResult.value | |
// res28: cats.data.Xor[Throwable,User] = Left(java.lang.Exception: Oh noes) | |
import cats.syntax.applicativeError._ | |
// import cats.syntax.applicativeError._ | |
fetchError.runA[Eval].attempt.value | |
// res29: cats.data.Xor[Throwable,User] = Left(java.lang.Exception: Oh noes) | |
val fetchPure: Fetch[Int] = 42.fetch | |
fetchPure.runA[Id] | |
// res30: cats.Id[Int] = 42 | |
val fetchFail: Fetch[Int] = new Exception("Something went terribly wrong").fetch | |
scala> fetchFail.runA[Id] | |
java.lang.Exception: Something went terribly wrong | |
... 1024 elided | |
val fetchJoined: Fetch[(Post, User)] = getPost(1).join(getUser(2)) | |
fetchJoined.runA[Id] | |
// ~~> [92] One Post 1 | |
// <~~ [92] One Post 1 | |
// ~~> [92] One User 2 | |
// <~~ [92] One User 2 | |
// res32: cats.Id[(Post, User)] = (Post(1,2,An article),User(2,@two)) | |
getPost(1).runA[Id] | |
// ~~> [92] One Post 1 | |
// <~~ [92] One Post 1 | |
// res33: cats.Id[Post] = Post(1,2,An article) | |
getPost(1).runE[Id] | |
// ~~> [92] One Post 1 | |
// <~~ [92] One Post 1 | |
// res34: cats.Id[fetch.FetchEnv] = FetchEnv(InMemoryCache(Map((PostSource$@5bb1530,1) -> Post(1,2,An article))),List(1),Queue(Round(InMemoryCache(Map()),PostSource$@5bb1530,OneRound(1),95475218554730,95475319980908,false))) | |
getPost(1).runF[Id] | |
// ~~> [92] One Post 1 | |
// <~~ [92] One Post 1 | |
// res35: cats.Id[(fetch.FetchEnv, Post)] = (FetchEnv(InMemoryCache(Map((PostSource$@5bb1530,1) -> Post(1,2,An article))),List(1),Queue(Round(InMemoryCache(Map()),PostSource$@5bb1530,OneRound(1),95475458907291,95475559950380,false))),Post(1,2,An article)) | |
val fetchPure: Fetch[Int] = Fetch.pure(42) | |
Fetch.run[Id](fetchPure) | |
// res36: cats.Id[Int] = 42 | |
val fetchFail: Fetch[Int] = Fetch.error(new Exception("Something went terribly wrong")) | |
scala> Fetch.run[Id](fetchFail) | |
java.lang.Exception: Something went terribly wrong | |
... 1024 elided | |
val fetchJoined: Fetch[(Post, User)] = Fetch.join(getPost(1), getUser(2)) | |
Fetch.run[Id](fetchJoined) | |
// ~~> [92] One Post 1 | |
// <~~ [92] One Post 1 | |
// ~~> [92] One User 2 | |
// <~~ [92] One User 2 | |
// res38: cats.Id[(Post, User)] = (Post(1,2,An article),User(2,@two)) | |
val fetchSequence: Fetch[List[User]] = Fetch.sequence(List(getUser(1), getUser(2), getUser(3))) | |
Fetch.run[Id](fetchSequence) | |
// ~~> [92] Many Users OneAnd(1,List(2, 3)) | |
// <~~ [92] Many Users OneAnd(1,List(2, 3)) | |
// res39: cats.Id[List[User]] = List(User(1,@one), User(2,@two), User(3,@three)) | |
val fetchTraverse: Fetch[List[User]] = Fetch.traverse(List(1, 2, 3))(getUser) | |
Fetch.run[Id](fetchTraverse) | |
// ~~> [92] Many Users OneAnd(1,List(2, 3)) | |
// <~~ [92] Many Users OneAnd(1,List(2, 3)) | |
// res40: cats.Id[List[User]] = List(User(1,@one), User(2,@two), User(3,@three)) | |
import cats.syntax.cartesian._ | |
val fetchThree: Fetch[(Post, User, Post)] = (getPost(1) |@| getUser(2) |@| getPost(2)).tupled | |
fetchThree.runA[Id] | |
// ~~> [92] Many Posts OneAnd(1,List(2)) | |
// <~~ [92] Many Posts OneAnd(1,List(2)) | |
// ~~> [92] One User 2 | |
// <~~ [92] One User 2 | |
// res42: cats.Id[(Post, User, Post)] = (Post(1,2,An article),User(2,@two),Post(2,3,Another article)) | |
val fetchFriends: Fetch[String] = (getUser(1) |@| getUser(2)).map({ (one, other) => | |
s"${one.username} is friends with ${other.username}" | |
}) | |
// fetchFriends: fetch.Fetch[String] = Gosub(Gosub(Suspend(Concurrent(List(FetchMany(OneAnd(1,List(2)),UserSource$@36695a49)))),<function1>),<function1>) | |
fetchFriends.runA[Id] | |
// ~~> [92] Many Users OneAnd(1,List(2)) | |
// <~~ [92] Many Users OneAnd(1,List(2)) | |
// res43: cats.Id[String] = @one is friends with @two | |
val fetchFriends: Fetch[String] = Fetch.join(getUser(1), getUser(2)).map({ case (one, other) => | |
s"${one.username} is friends with ${other.username}" | |
}) | |
// fetchFriends: fetch.Fetch[String] = Gosub(Gosub(Suspend(Concurrent(List(FetchMany(OneAnd(1,List(2)),UserSource$@36695a49)))),<function1>),<function1>) | |
fetchFriends.runA[Id] | |
// ~~> [92] Many Users OneAnd(1,List(2)) | |
// <~~ [92] Many Users OneAnd(1,List(2)) | |
// res44: cats.Id[String] = @one is friends with @two | |
val postsByAuthor: Fetch[List[Post]] = for { | |
posts <- List(1, 2).traverse(getPost) | |
authors <- posts.traverse(getAuthor) | |
ordered = (posts zip authors).sortBy({ case (_, author) => author.username }).map(_._1) | |
} yield ordered | |
val postTopics: Fetch[Map[PostTopic, Int]] = for { | |
posts <- List(2, 3).traverse(getPost) | |
topics <- posts.traverse(getPostTopic) | |
countByTopic = (posts zip topics).groupBy(_._2).mapValues(_.size) | |
} yield countByTopic | |
val homePage = (postsByAuthor |@| postTopics).tupled | |
Await.result(Fetch.run[Future](homePage), Duration.Inf) | |
// ~~> [94] Many Posts OneAnd(1,List(2, 3)) | |
// <~~ [94] Many Posts OneAnd(1,List(2, 3)) | |
// ~~> [75] Many Users OneAnd(2,List(3)) | |
// ~~> [94] Many Post Topics OneAnd(Post(2,3,Another article),List(Post(3,4,Yet another article))) | |
// <~~ [75] Many Users OneAnd(2,List(3)) | |
// <~~ [94] Many Post Topics OneAnd(Post(2,3,Another article),List(Post(3,4,Yet another article))) | |
// res47: (List[Post], Map[PostTopic,Int]) = (List(Post(2,3,Another article), Post(1,2,An article)),Map(monad -> 1, applicative -> 1)) | |
"com.fortysevendeg" %% "fetch-monix" % "0.3.0" | |
import monix.eval.Task | |
import monix.execution.Scheduler | |
import fetch.monixTask.implicits._ | |
val scheduler = Scheduler.Implicits.global | |
// scheduler: monix.execution.Scheduler = monix.execution.schedulers.AsyncScheduler@3fed2615 | |
val task = Fetch.run[Task](homePage) | |
// task: monix.eval.Task[(List[Post], Map[PostTopic,Int])] = BindSuspend(<function0>,<function1>) | |
Await.result(task.runAsync(scheduler), Duration.Inf) | |
// ~~> [97] Many Posts OneAnd(1,List(2, 3)) | |
// <~~ [97] Many Posts OneAnd(1,List(2, 3)) | |
// ~~> [96] Many Users OneAnd(2,List(3)) | |
// ~~> [75] Many Post Topics OneAnd(Post(2,3,Another article),List(Post(3,4,Yet another article))) | |
// <~~ [96] Many Users OneAnd(2,List(3)) | |
// <~~ [75] Many Post Topics OneAnd(Post(2,3,Another article),List(Post(3,4,Yet another article))) | |
// res49: (List[Post], Map[PostTopic,Int]) = (List(Post(2,3,Another article), Post(1,2,An article)),Map(monad -> 1, applicative -> 1)) | |
val ioSched = Scheduler.io(name="io-scheduler") | |
// ioSched: monix.execution.Scheduler = monix.execution.schedulers.AsyncScheduler@14ce603d | |
Await.result(task.runAsync(ioSched), Duration.Inf) | |
// ~~> [99] Many Posts OneAnd(1,List(2, 3)) | |
// <~~ [99] Many Posts OneAnd(1,List(2, 3)) | |
// ~~> [99] Many Users OneAnd(2,List(3)) | |
// ~~> [100] Many Post Topics OneAnd(Post(2,3,Another article),List(Post(3,4,Yet another article))) | |
// <~~ [99] Many Users OneAnd(2,List(3)) | |
// <~~ [100] Many Post Topics OneAnd(Post(2,3,Another article),List(Post(3,4,Yet another article))) | |
// res50: (List[Post], Map[PostTopic,Int]) = (List(Post(2,3,Another article), Post(1,2,An article)),Map(monad -> 1, applicative -> 1)) | |
import cats.{Eval, Now, Later, Always} | |
import monix.eval.Task | |
def evalToTask[A](e: Eval[A]): Task[A] = e match { | |
case Now(x) => Task.now(x) | |
case l: Later[A] => Task.evalOnce(l.value) | |
case a: Always[A] => Task.evalAlways(a.value) | |
case other => Task.evalOnce(other.value) | |
} | |
import monix.execution.Cancelable | |
import scala.concurrent.duration._ | |
def queryToTask[A](q: Query[A]): Task[A] = q match { | |
case Sync(e) => evalToTask(e) | |
case Async(action, timeout) => { | |
val task: Task[A] = Task.create((scheduler, callback) => { | |
scheduler.execute(new Runnable { | |
def run() = action(callback.onSuccess, callback.onError) | |
}) | |
Cancelable.empty | |
}) | |
timeout match { | |
case finite: FiniteDuration => task.timeout(finite) | |
case _ => task | |
} | |
} | |
case Ap(qf, qx) => Task.zip2(queryToTask(qf), queryToTask(qx)).map({ case (f, x) => f(x) }) | |
} | |
implicit val taskFetchMonadError: FetchMonadError[Task] = new FetchMonadError[Task] { | |
override def map[A, B](fa: Task[A])(f: A => B): Task[B] = | |
fa.map(f) | |
override def product[A, B](fa: Task[A], fb: Task[B]): Task[(A, B)] = | |
Task.zip2(Task.fork(fa), Task.fork(fb)) // introduce parallelism with Task#fork | |
override def pureEval[A](e: Eval[A]): Task[A] = evalToTask(e) | |
def pure[A](x: A): Task[A] = | |
Task.now(x) | |
def handleErrorWith[A](fa: Task[A])(f: Throwable => Task[A]): Task[A] = | |
fa.onErrorHandleWith(f) | |
def raiseError[A](e: Throwable): Task[A] = | |
Task.raiseError(e) | |
def flatMap[A, B](fa: Task[A])(f: A => Task[B]): Task[B] = | |
fa.flatMap(f) | |
override def runQuery[A](q: Query[A]): Task[A] = queryToTask(q) | |
} | |
val task = Fetch.run(homePage)(taskFetchMonadError) | |
// task: monix.eval.Task[(List[Post], Map[PostTopic,Int])] = BindSuspend(<function0>,<function1>) | |
Await.result(task.runAsync(scheduler), Duration.Inf) | |
// ~~> [96] Many Posts OneAnd(1,List(2, 3)) | |
// <~~ [96] Many Posts OneAnd(1,List(2, 3)) | |
// ~~> [94] Many Users OneAnd(2,List(3)) | |
// ~~> [96] Many Post Topics OneAnd(Post(2,3,Another article),List(Post(3,4,Yet another article))) | |
// <~~ [94] Many Users OneAnd(2,List(3)) | |
// <~~ [96] Many Post Topics OneAnd(Post(2,3,Another article),List(Post(3,4,Yet another article))) | |
// res53: (List[Post], Map[PostTopic,Int]) = (List(Post(2,3,Another article), Post(1,2,An article)),Map(monad -> 1, applicative -> 1)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment