Skip to content

Instantly share code, notes, and snippets.

@jdesiloniz
Created November 4, 2016 11:48
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jdesiloniz/e361b107c347455a532c2006da13a6d3 to your computer and use it in GitHub Desktop.
Save jdesiloniz/e361b107c347455a532c2006da13a6d3 to your computer and use it in GitHub Desktop.
dassdadsasd
import cats.data.NonEmptyList
import fetch._
trait DataSource[Identity, Result]{
def fetchOne(id: Identity): Query[Option[Result]]
def fetchMany(ids: NonEmptyList[Identity]): Query[Map[Identity, Result]]
}
type UserId = Int
case class User(id: UserId, username: String)
def latency[A](result: A, msg: String) = {
val id = Thread.currentThread.getId
println(s"~~> [$id] $msg")
Thread.sleep(100)
println(s"<~~ [$id] $msg")
result
}
import cats.data.NonEmptyList
import cats.std.list._
import fetch._
val userDatabase: Map[UserId, User] = Map(
1 -> User(1, "@one"),
2 -> User(2, "@two"),
3 -> User(3, "@three"),
4 -> User(4, "@four")
)
implicit object UserSource extends DataSource[UserId, User]{
override def fetchOne(id: UserId): Query[Option[User]] = {
Query.sync({
latency(userDatabase.get(id), s"One User $id")
})
}
override def fetchMany(ids: NonEmptyList[UserId]): Query[Map[UserId, User]] = {
Query.sync({
latency(userDatabase.filterKeys(ids.unwrap.contains), s"Many Users $ids")
})
}
}
def getUser(id: UserId): Fetch[User] = Fetch(id) // or, more explicitly: Fetch(id)(UserSource)
implicit object UnbatchedSource extends DataSource[Int, Int]{
override def fetchOne(id: Int): Query[Option[Int]] = {
Query.sync(Option(id))
}
override def fetchMany(ids: NonEmptyList[Int]): Query[Map[Int, Int]] = {
batchingNotSupported(ids)
}
}
val fetchUser: Fetch[User] = getUser(1)
import cats.Id
import fetch.unsafe.implicits._
import fetch.syntax._
fetchUser.runA[Id]
// ~~> [92] One User 1
// <~~ [92] One User 1
// res3: cats.Id[User] = User(1,@one)
val fetchTwoUsers: Fetch[(User, User)] = for {
aUser <- getUser(1)
anotherUser <- getUser(aUser.id + 1)
} yield (aUser, anotherUser)
fetchTwoUsers.runA[Id]
// ~~> [92] One User 1
// <~~ [92] One User 1
// ~~> [92] One User 2
// <~~ [92] One User 2
// res4: cats.Id[(User, User)] = (User(1,@one),User(2,@two))
import cats.syntax.cartesian._
val fetchProduct: Fetch[(User, User)] = getUser(1).product(getUser(2))
fetchProduct.runA[Id]
// ~~> [92] Many Users OneAnd(1,List(2))
// <~~ [92] Many Users OneAnd(1,List(2))
// res6: cats.Id[(User, User)] = (User(1,@one),User(2,@two))
val fetchDuped: Fetch[(User, User)] = getUser(1).product(getUser(1))
fetchDuped.runA[Id]
// ~~> [92] One User 1
// <~~ [92] One User 1
// res7: cats.Id[(User, User)] = (User(1,@one),User(1,@one))
val fetchCached: Fetch[(User, User)] = for {
aUser <- getUser(1)
anotherUser <- getUser(1)
} yield (aUser, anotherUser)
fetchCached.runA[Id]
// ~~> [92] One User 1
// <~~ [92] One User 1
// res8: cats.Id[(User, User)] = (User(1,@one),User(1,@one))
Query.sync(42)
// res9: fetch.Query[Int] = Sync(cats.Later@52caf59b)
Query.sync({ println("Computing 42"); 42 })
// res10: fetch.Query[Int] = Sync(cats.Later@7ffb6457)
import cats.Eval
// import cats.Eval
Query.eval(Eval.always({ println("Computing 42"); 42 }))
// res11: fetch.Query[Int] = Sync(cats.Always@3e6bbcd5)
Query.async((ok: (Int => Unit), fail) => {
Thread.sleep(100)
ok(42)
})
// res12: fetch.Query[Int] = Async(<function2>,Duration.Inf)
type PostId = Int
case class Post(id: PostId, author: UserId, content: String)
val postDatabase: Map[PostId, Post] = Map(
1 -> Post(1, 2, "An article"),
2 -> Post(2, 3, "Another article"),
3 -> Post(3, 4, "Yet another article")
)
implicit object PostSource extends DataSource[PostId, Post]{
override def fetchOne(id: PostId): Query[Option[Post]] = {
Query.sync({
latency(postDatabase.get(id), s"One Post $id")
})
}
override def fetchMany(ids: NonEmptyList[PostId]): Query[Map[PostId, Post]] = {
Query.sync({
latency(postDatabase.filterKeys(ids.unwrap.contains), s"Many Posts $ids")
})
}
}
def getPost(id: PostId): Fetch[Post] = Fetch(id)
def getAuthor(p: Post): Fetch[User] = Fetch(p.author)
type PostTopic = String
implicit object PostTopicSource extends DataSource[Post, PostTopic]{
override def fetchOne(id: Post): Query[Option[PostTopic]] = {
Query.sync({
val topic = if (id.id % 2 == 0) "monad" else "applicative"
latency(Option(topic), s"One Post Topic $id")
})
}
override def fetchMany(ids: NonEmptyList[Post]): Query[Map[Post, PostTopic]] = {
Query.sync({
val result = ids.unwrap.map(id => (id, if (id.id % 2 == 0) "monad" else "applicative")).toMap
latency(result, s"Many Post Topics $ids")
})
}
}
def getPostTopic(post: Post): Fetch[PostTopic] = Fetch(post)
val fetchMulti: Fetch[(Post, PostTopic)] = for {
post <- getPost(1)
topic <- getPostTopic(post)
} yield (post, topic)
fetchMulti.runA[Id]
// ~~> [92] One Post 1
// <~~ [92] One Post 1
// ~~> [92] One Post Topic Post(1,2,An article)
// <~~ [92] One Post Topic Post(1,2,An article)
// res16: cats.Id[(Post, PostTopic)] = (Post(1,2,An article),applicative)
val fetchConcurrent: Fetch[(Post, User)] = getPost(1).product(getUser(2))
fetchConcurrent.runA[Id]
// ~~> [92] One Post 1
// <~~ [92] One Post 1
// ~~> [92] One User 2
// <~~ [92] One User 2
// res17: cats.Id[(Post, User)] = (Post(1,2,An article),User(2,@two))
import scala.concurrent._
import ExecutionContext.Implicits.global
import scala.concurrent.duration._
import fetch.implicits._
// import fetch.implicits._
Await.result(fetchConcurrent.runA[Future], Duration.Inf)
// ~~> [93] One Post 1
// ~~> [75] One User 2
// <~~ [93] One Post 1
// <~~ [75] One User 2
// res18: (Post, User) = (Post(1,2,An article),User(2,@two))
import cats.std.list._
import cats.syntax.traverse._
val fetchSequence: Fetch[List[User]] = List(getUser(1), getUser(2), getUser(3)).sequence
fetchSequence.runA[Id]
// ~~> [92] Many Users OneAnd(1,List(2, 3))
// <~~ [92] Many Users OneAnd(1,List(2, 3))
// res20: cats.Id[List[User]] = List(User(1,@one), User(2,@two), User(3,@three))
val fetchTraverse: Fetch[List[User]] = List(1, 2, 3).traverse(getUser)
fetchTraverse.runA[Id]
// ~~> [92] Many Users OneAnd(1,List(2, 3))
// <~~ [92] Many Users OneAnd(1,List(2, 3))
// res21: cats.Id[List[User]] = List(User(1,@one), User(2,@two), User(3,@three))
val cache = InMemoryCache(UserSource.identity(1) -> User(1, "@dialelo"))
// cache: fetch.InMemoryCache = InMemoryCache(Map((UserSource$@36695a49,1) -> User(1,@dialelo)))
Fetch.run[Id](fetchUser, cache)
// res22: cats.Id[User] = User(1,@dialelo)
fetchUser.runA[Id](cache)
// res23: cats.Id[User] = User(1,@dialelo)
val fetchManyUsers: Fetch[List[User]] = List(1, 2, 3).traverse(getUser)
fetchManyUsers.runA[Id](cache)
// ~~> [92] Many Users OneAnd(2,List(3))
// <~~ [92] Many Users OneAnd(2,List(3))
// res24: cats.Id[List[User]] = List(User(1,@dialelo), User(2,@two), User(3,@three))
val env = fetchManyUsers.runE[Id]
// ~~> [92] Many Users OneAnd(1,List(2, 3))
// <~~ [92] Many Users OneAnd(1,List(2, 3))
// env: cats.Id[fetch.FetchEnv] = FetchEnv(InMemoryCache(Map((UserSource$@36695a49,1) -> User(1,@one), (UserSource$@36695a49,2) -> User(2,@two), (UserSource$@36695a49,3) -> User(3,@three))),List(),Queue(Round(InMemoryCache(Map()),Concurrent,ConcurrentRound(Map(UserSource$@36695a49 -> List(1, 2, 3))),95471526461769,95471630826900,false)))
fetchManyUsers.runA[Id](env.cache)
// res25: cats.Id[List[User]] = List(User(1,@one), User(2,@two), User(3,@three))
trait DataSourceCache {
def update[A](k: DataSourceIdentity, v: A): DataSourceCache
def get[A](k: DataSourceIdentity): Option[A]
}
final case class ForgetfulCache() extends DataSourceCache {
override def get[A](k: DataSourceIdentity): Option[A] = None
override def update[A](k: DataSourceIdentity, v: A): ForgetfulCache = this
}
val fetchSameTwice: Fetch[(User, User)] = for {
one <- getUser(1)
another <- getUser(1)
} yield (one, another)
// fetchSameTwice: fetch.Fetch[(User, User)] = Gosub(Suspend(FetchOne(1,UserSource$@36695a49)),<function1>)
fetchSameTwice.runA[Id](ForgetfulCache())
// ~~> [92] One User 1
// <~~ [92] One User 1
// ~~> [92] One User 1
// <~~ [92] One User 1
// res26: cats.Id[(User, User)] = (User(1,@one),User(1,@one))
val fetchError: Fetch[User] = (new Exception("Oh noes")).fetch
scala> fetchError.runA[Id]
java.lang.Exception: Oh noes
... 1024 elided
import fetch.unsafe.implicits._
import cats.Eval
// import cats.Eval
import cats.data.Xor
// import cats.data.Xor
val safeResult: Eval[Throwable Xor User] = FetchMonadError[Eval].attempt(fetchError.runA[Eval])
// safeResult: cats.Eval[cats.data.Xor[Throwable,User]] = cats.Later@78cef96d
safeResult.value
// res28: cats.data.Xor[Throwable,User] = Left(java.lang.Exception: Oh noes)
import cats.syntax.applicativeError._
// import cats.syntax.applicativeError._
fetchError.runA[Eval].attempt.value
// res29: cats.data.Xor[Throwable,User] = Left(java.lang.Exception: Oh noes)
val fetchPure: Fetch[Int] = 42.fetch
fetchPure.runA[Id]
// res30: cats.Id[Int] = 42
val fetchFail: Fetch[Int] = new Exception("Something went terribly wrong").fetch
scala> fetchFail.runA[Id]
java.lang.Exception: Something went terribly wrong
... 1024 elided
val fetchJoined: Fetch[(Post, User)] = getPost(1).join(getUser(2))
fetchJoined.runA[Id]
// ~~> [92] One Post 1
// <~~ [92] One Post 1
// ~~> [92] One User 2
// <~~ [92] One User 2
// res32: cats.Id[(Post, User)] = (Post(1,2,An article),User(2,@two))
getPost(1).runA[Id]
// ~~> [92] One Post 1
// <~~ [92] One Post 1
// res33: cats.Id[Post] = Post(1,2,An article)
getPost(1).runE[Id]
// ~~> [92] One Post 1
// <~~ [92] One Post 1
// res34: cats.Id[fetch.FetchEnv] = FetchEnv(InMemoryCache(Map((PostSource$@5bb1530,1) -> Post(1,2,An article))),List(1),Queue(Round(InMemoryCache(Map()),PostSource$@5bb1530,OneRound(1),95475218554730,95475319980908,false)))
getPost(1).runF[Id]
// ~~> [92] One Post 1
// <~~ [92] One Post 1
// res35: cats.Id[(fetch.FetchEnv, Post)] = (FetchEnv(InMemoryCache(Map((PostSource$@5bb1530,1) -> Post(1,2,An article))),List(1),Queue(Round(InMemoryCache(Map()),PostSource$@5bb1530,OneRound(1),95475458907291,95475559950380,false))),Post(1,2,An article))
val fetchPure: Fetch[Int] = Fetch.pure(42)
Fetch.run[Id](fetchPure)
// res36: cats.Id[Int] = 42
val fetchFail: Fetch[Int] = Fetch.error(new Exception("Something went terribly wrong"))
scala> Fetch.run[Id](fetchFail)
java.lang.Exception: Something went terribly wrong
... 1024 elided
val fetchJoined: Fetch[(Post, User)] = Fetch.join(getPost(1), getUser(2))
Fetch.run[Id](fetchJoined)
// ~~> [92] One Post 1
// <~~ [92] One Post 1
// ~~> [92] One User 2
// <~~ [92] One User 2
// res38: cats.Id[(Post, User)] = (Post(1,2,An article),User(2,@two))
val fetchSequence: Fetch[List[User]] = Fetch.sequence(List(getUser(1), getUser(2), getUser(3)))
Fetch.run[Id](fetchSequence)
// ~~> [92] Many Users OneAnd(1,List(2, 3))
// <~~ [92] Many Users OneAnd(1,List(2, 3))
// res39: cats.Id[List[User]] = List(User(1,@one), User(2,@two), User(3,@three))
val fetchTraverse: Fetch[List[User]] = Fetch.traverse(List(1, 2, 3))(getUser)
Fetch.run[Id](fetchTraverse)
// ~~> [92] Many Users OneAnd(1,List(2, 3))
// <~~ [92] Many Users OneAnd(1,List(2, 3))
// res40: cats.Id[List[User]] = List(User(1,@one), User(2,@two), User(3,@three))
import cats.syntax.cartesian._
val fetchThree: Fetch[(Post, User, Post)] = (getPost(1) |@| getUser(2) |@| getPost(2)).tupled
fetchThree.runA[Id]
// ~~> [92] Many Posts OneAnd(1,List(2))
// <~~ [92] Many Posts OneAnd(1,List(2))
// ~~> [92] One User 2
// <~~ [92] One User 2
// res42: cats.Id[(Post, User, Post)] = (Post(1,2,An article),User(2,@two),Post(2,3,Another article))
val fetchFriends: Fetch[String] = (getUser(1) |@| getUser(2)).map({ (one, other) =>
s"${one.username} is friends with ${other.username}"
})
// fetchFriends: fetch.Fetch[String] = Gosub(Gosub(Suspend(Concurrent(List(FetchMany(OneAnd(1,List(2)),UserSource$@36695a49)))),<function1>),<function1>)
fetchFriends.runA[Id]
// ~~> [92] Many Users OneAnd(1,List(2))
// <~~ [92] Many Users OneAnd(1,List(2))
// res43: cats.Id[String] = @one is friends with @two
val fetchFriends: Fetch[String] = Fetch.join(getUser(1), getUser(2)).map({ case (one, other) =>
s"${one.username} is friends with ${other.username}"
})
// fetchFriends: fetch.Fetch[String] = Gosub(Gosub(Suspend(Concurrent(List(FetchMany(OneAnd(1,List(2)),UserSource$@36695a49)))),<function1>),<function1>)
fetchFriends.runA[Id]
// ~~> [92] Many Users OneAnd(1,List(2))
// <~~ [92] Many Users OneAnd(1,List(2))
// res44: cats.Id[String] = @one is friends with @two
val postsByAuthor: Fetch[List[Post]] = for {
posts <- List(1, 2).traverse(getPost)
authors <- posts.traverse(getAuthor)
ordered = (posts zip authors).sortBy({ case (_, author) => author.username }).map(_._1)
} yield ordered
val postTopics: Fetch[Map[PostTopic, Int]] = for {
posts <- List(2, 3).traverse(getPost)
topics <- posts.traverse(getPostTopic)
countByTopic = (posts zip topics).groupBy(_._2).mapValues(_.size)
} yield countByTopic
val homePage = (postsByAuthor |@| postTopics).tupled
Await.result(Fetch.run[Future](homePage), Duration.Inf)
// ~~> [94] Many Posts OneAnd(1,List(2, 3))
// <~~ [94] Many Posts OneAnd(1,List(2, 3))
// ~~> [75] Many Users OneAnd(2,List(3))
// ~~> [94] Many Post Topics OneAnd(Post(2,3,Another article),List(Post(3,4,Yet another article)))
// <~~ [75] Many Users OneAnd(2,List(3))
// <~~ [94] Many Post Topics OneAnd(Post(2,3,Another article),List(Post(3,4,Yet another article)))
// res47: (List[Post], Map[PostTopic,Int]) = (List(Post(2,3,Another article), Post(1,2,An article)),Map(monad -> 1, applicative -> 1))
"com.fortysevendeg" %% "fetch-monix" % "0.3.0"
import monix.eval.Task
import monix.execution.Scheduler
import fetch.monixTask.implicits._
val scheduler = Scheduler.Implicits.global
// scheduler: monix.execution.Scheduler = monix.execution.schedulers.AsyncScheduler@3fed2615
val task = Fetch.run[Task](homePage)
// task: monix.eval.Task[(List[Post], Map[PostTopic,Int])] = BindSuspend(<function0>,<function1>)
Await.result(task.runAsync(scheduler), Duration.Inf)
// ~~> [97] Many Posts OneAnd(1,List(2, 3))
// <~~ [97] Many Posts OneAnd(1,List(2, 3))
// ~~> [96] Many Users OneAnd(2,List(3))
// ~~> [75] Many Post Topics OneAnd(Post(2,3,Another article),List(Post(3,4,Yet another article)))
// <~~ [96] Many Users OneAnd(2,List(3))
// <~~ [75] Many Post Topics OneAnd(Post(2,3,Another article),List(Post(3,4,Yet another article)))
// res49: (List[Post], Map[PostTopic,Int]) = (List(Post(2,3,Another article), Post(1,2,An article)),Map(monad -> 1, applicative -> 1))
val ioSched = Scheduler.io(name="io-scheduler")
// ioSched: monix.execution.Scheduler = monix.execution.schedulers.AsyncScheduler@14ce603d
Await.result(task.runAsync(ioSched), Duration.Inf)
// ~~> [99] Many Posts OneAnd(1,List(2, 3))
// <~~ [99] Many Posts OneAnd(1,List(2, 3))
// ~~> [99] Many Users OneAnd(2,List(3))
// ~~> [100] Many Post Topics OneAnd(Post(2,3,Another article),List(Post(3,4,Yet another article)))
// <~~ [99] Many Users OneAnd(2,List(3))
// <~~ [100] Many Post Topics OneAnd(Post(2,3,Another article),List(Post(3,4,Yet another article)))
// res50: (List[Post], Map[PostTopic,Int]) = (List(Post(2,3,Another article), Post(1,2,An article)),Map(monad -> 1, applicative -> 1))
import cats.{Eval, Now, Later, Always}
import monix.eval.Task
def evalToTask[A](e: Eval[A]): Task[A] = e match {
case Now(x) => Task.now(x)
case l: Later[A] => Task.evalOnce(l.value)
case a: Always[A] => Task.evalAlways(a.value)
case other => Task.evalOnce(other.value)
}
import monix.execution.Cancelable
import scala.concurrent.duration._
def queryToTask[A](q: Query[A]): Task[A] = q match {
case Sync(e) => evalToTask(e)
case Async(action, timeout) => {
val task: Task[A] = Task.create((scheduler, callback) => {
scheduler.execute(new Runnable {
def run() = action(callback.onSuccess, callback.onError)
})
Cancelable.empty
})
timeout match {
case finite: FiniteDuration => task.timeout(finite)
case _ => task
}
}
case Ap(qf, qx) => Task.zip2(queryToTask(qf), queryToTask(qx)).map({ case (f, x) => f(x) })
}
implicit val taskFetchMonadError: FetchMonadError[Task] = new FetchMonadError[Task] {
override def map[A, B](fa: Task[A])(f: A => B): Task[B] =
fa.map(f)
override def product[A, B](fa: Task[A], fb: Task[B]): Task[(A, B)] =
Task.zip2(Task.fork(fa), Task.fork(fb)) // introduce parallelism with Task#fork
override def pureEval[A](e: Eval[A]): Task[A] = evalToTask(e)
def pure[A](x: A): Task[A] =
Task.now(x)
def handleErrorWith[A](fa: Task[A])(f: Throwable => Task[A]): Task[A] =
fa.onErrorHandleWith(f)
def raiseError[A](e: Throwable): Task[A] =
Task.raiseError(e)
def flatMap[A, B](fa: Task[A])(f: A => Task[B]): Task[B] =
fa.flatMap(f)
override def runQuery[A](q: Query[A]): Task[A] = queryToTask(q)
}
val task = Fetch.run(homePage)(taskFetchMonadError)
// task: monix.eval.Task[(List[Post], Map[PostTopic,Int])] = BindSuspend(<function0>,<function1>)
Await.result(task.runAsync(scheduler), Duration.Inf)
// ~~> [96] Many Posts OneAnd(1,List(2, 3))
// <~~ [96] Many Posts OneAnd(1,List(2, 3))
// ~~> [94] Many Users OneAnd(2,List(3))
// ~~> [96] Many Post Topics OneAnd(Post(2,3,Another article),List(Post(3,4,Yet another article)))
// <~~ [94] Many Users OneAnd(2,List(3))
// <~~ [96] Many Post Topics OneAnd(Post(2,3,Another article),List(Post(3,4,Yet another article)))
// res53: (List[Post], Map[PostTopic,Int]) = (List(Post(2,3,Another article), Post(1,2,An article)),Map(monad -> 1, applicative -> 1))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment