-
-
Save byarr/1491b0715dac8cc1f222f6e6fc7f852b to your computer and use it in GitHub Desktop.
Fetch Caching oddity
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package fetch.test | |
import cats.data.NonEmptyList | |
import cats.effect.{Async, Concurrent, IO, Sync} | |
import fetch.{Data, DataSource, Fetch} | |
import java.util.concurrent.ScheduledThreadPoolExecutor | |
import scala.concurrent.ExecutionContext | |
import scala.concurrent.duration | |
import cats.{Applicative, Monad} | |
import cats.effect.{Async, IO, unsafe} | |
import fetch.Fetch | |
import trust.ato.Action.Action | |
import cats.syntax.all._ | |
import fetch._ | |
import trust.ato.Ruler.atoRequest | |
import scala.collection.mutable | |
import scala.concurrent.ExecutionContext | |
import scala.concurrent.duration.DurationInt | |
import scala.language.postfixOps | |
object Cache { | |
def latency[F[_] : Sync](msg: String): F[Unit] = for { | |
_ <- Sync[F].delay(println(s"--> [${Thread.currentThread.getId}] $msg")) | |
_ <- Sync[F].delay(Thread.sleep(100)) | |
_ <- Sync[F].delay(println(s"<-- [${Thread.currentThread.getId}] $msg")) | |
} yield () | |
// users | |
type UserId = Int | |
case class User(id: UserId, username: String) | |
val userDatabase: Map[UserId, User] = Map( | |
1 -> User(1, "@one"), | |
2 -> User(2, "@two"), | |
3 -> User(3, "@three"), | |
4 -> User(4, "@four") | |
) | |
object Users extends Data[UserId, User] { | |
def name = "Users" | |
def source[F[_] : Async]: DataSource[F, UserId, User] = new DataSource[F, UserId, User] { | |
override def data = Users | |
override def CF = Concurrent[F] | |
override def fetch(id: UserId): F[Option[User]] = | |
latency[F](s"One User $id") >> CF.pure(userDatabase.get(id)) | |
override def batch(ids: NonEmptyList[UserId]): F[Map[UserId, User]] = | |
latency[F](s"Batch Users $ids") >> CF.pure(userDatabase.view.filterKeys(ids.toList.toSet).toMap) | |
} | |
} | |
def getUser[F[_] : Async](id: UserId): Fetch[F, User] = | |
Fetch(id, Users.source) | |
// posts | |
type PostId = Int | |
case class Post(id: PostId, author: UserId, content: String) | |
val postDatabase: Map[PostId, Post] = Map( | |
1 -> Post(1, 2, "An article"), | |
2 -> Post(2, 3, "Another article"), | |
3 -> Post(3, 4, "Yet another article") | |
) | |
object Posts extends Data[PostId, Post] { | |
def name = "Posts" | |
def source[F[_] : Async]: DataSource[F, PostId, Post] = new DataSource[F, PostId, Post] { | |
override def data = Posts | |
override def CF = Concurrent[F] | |
override def fetch(id: PostId): F[Option[Post]] = | |
latency[F](s"One Post $id") >> CF.pure(postDatabase.get(id)) | |
override def batch(ids: NonEmptyList[PostId]): F[Map[PostId, Post]] = | |
latency[F](s"Batch Posts $ids") >> CF.pure(postDatabase.view.filterKeys(ids.toList.toSet).toMap) | |
} | |
} | |
def getPost[F[_] : Async](id: PostId): Fetch[F, Post] = | |
Fetch(id, Posts.source) | |
val executor = new ScheduledThreadPoolExecutor(4) | |
val executionContext: ExecutionContext = ExecutionContext.fromExecutor(executor) | |
import cats.effect.unsafe.implicits.global | |
def inspectCache(dataCache: DataCache[IO]): Unit = { | |
val inmemory = dataCache.asInstanceOf[InMemoryCache[IO]] | |
println(s"Cached items count: ${inmemory.state.size}") | |
println(s"Cached items: ${inmemory.state}") | |
} | |
def main(args: Array[String]): Unit = { | |
val fetchUsers = (getUser[IO](1), getUser[IO](2)).tupled | |
val (_, userCache, _) = Fetch.runAll[IO](fetchUsers).unsafeRunTimed(5.seconds).get | |
inspectCache(userCache) | |
val fetchUserPost = (getUser[IO](1), getPost[IO](1)).tupled | |
val (_, userPostCache, _) = Fetch.runAll[IO](fetchUserPost).unsafeRunTimed(5.seconds).get | |
inspectCache(userPostCache) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment