Skip to content

Instantly share code, notes, and snippets.

@byarr
Last active Aug 3, 2021
Embed
What would you like to do?
Fetch Caching oddity
package fetch.test
import cats.data.NonEmptyList
import cats.effect.{Async, Concurrent, IO, Sync}
import fetch.{Data, DataSource, Fetch}
import java.util.concurrent.ScheduledThreadPoolExecutor
import scala.concurrent.ExecutionContext
import scala.concurrent.duration
import cats.{Applicative, Monad}
import cats.effect.{Async, IO, unsafe}
import fetch.Fetch
import trust.ato.Action.Action
import cats.syntax.all._
import fetch._
import trust.ato.Ruler.atoRequest
import scala.collection.mutable
import scala.concurrent.ExecutionContext
import scala.concurrent.duration.DurationInt
import scala.language.postfixOps
object Cache {
def latency[F[_] : Sync](msg: String): F[Unit] = for {
_ <- Sync[F].delay(println(s"--> [${Thread.currentThread.getId}] $msg"))
_ <- Sync[F].delay(Thread.sleep(100))
_ <- Sync[F].delay(println(s"<-- [${Thread.currentThread.getId}] $msg"))
} yield ()
// users
type UserId = Int
case class User(id: UserId, username: String)
val userDatabase: Map[UserId, User] = Map(
1 -> User(1, "@one"),
2 -> User(2, "@two"),
3 -> User(3, "@three"),
4 -> User(4, "@four")
)
object Users extends Data[UserId, User] {
def name = "Users"
def source[F[_] : Async]: DataSource[F, UserId, User] = new DataSource[F, UserId, User] {
override def data = Users
override def CF = Concurrent[F]
override def fetch(id: UserId): F[Option[User]] =
latency[F](s"One User $id") >> CF.pure(userDatabase.get(id))
override def batch(ids: NonEmptyList[UserId]): F[Map[UserId, User]] =
latency[F](s"Batch Users $ids") >> CF.pure(userDatabase.view.filterKeys(ids.toList.toSet).toMap)
}
}
def getUser[F[_] : Async](id: UserId): Fetch[F, User] =
Fetch(id, Users.source)
// posts
type PostId = Int
case class Post(id: PostId, author: UserId, content: String)
val postDatabase: Map[PostId, Post] = Map(
1 -> Post(1, 2, "An article"),
2 -> Post(2, 3, "Another article"),
3 -> Post(3, 4, "Yet another article")
)
object Posts extends Data[PostId, Post] {
def name = "Posts"
def source[F[_] : Async]: DataSource[F, PostId, Post] = new DataSource[F, PostId, Post] {
override def data = Posts
override def CF = Concurrent[F]
override def fetch(id: PostId): F[Option[Post]] =
latency[F](s"One Post $id") >> CF.pure(postDatabase.get(id))
override def batch(ids: NonEmptyList[PostId]): F[Map[PostId, Post]] =
latency[F](s"Batch Posts $ids") >> CF.pure(postDatabase.view.filterKeys(ids.toList.toSet).toMap)
}
}
def getPost[F[_] : Async](id: PostId): Fetch[F, Post] =
Fetch(id, Posts.source)
val executor = new ScheduledThreadPoolExecutor(4)
val executionContext: ExecutionContext = ExecutionContext.fromExecutor(executor)
import cats.effect.unsafe.implicits.global
def inspectCache(dataCache: DataCache[IO]): Unit = {
val inmemory = dataCache.asInstanceOf[InMemoryCache[IO]]
println(s"Cached items count: ${inmemory.state.size}")
println(s"Cached items: ${inmemory.state}")
}
def main(args: Array[String]): Unit = {
val fetchUsers = (getUser[IO](1), getUser[IO](2)).tupled
val (_, userCache, _) = Fetch.runAll[IO](fetchUsers).unsafeRunTimed(5.seconds).get
inspectCache(userCache)
val fetchUserPost = (getUser[IO](1), getPost[IO](1)).tupled
val (_, userPostCache, _) = Fetch.runAll[IO](fetchUserPost).unsafeRunTimed(5.seconds).get
inspectCache(userPostCache)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment