Skip to content

Instantly share code, notes, and snippets.

@jenshalm
Last active September 30, 2019 17:04
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jenshalm/702122b0a9ddadf9e8b1f37638817489 to your computer and use it in GitHub Desktop.
Save jenshalm/702122b0a9ddadf9e8b1f37638817489 to your computer and use it in GitHub Desktop.
Example for caching with expiry in a referentially transparent way using cats-effect
/*
* Copyright 2019 - Jens Halm
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.kiwipowered.stakzcore.client
import java.util.concurrent.TimeUnit
import cats.Monad
import cats.effect.concurrent.{Deferred, Ref}
import cats.effect.{Async, Clock}
import cats.implicits._
import Cache._
import scala.concurrent.duration.FiniteDuration
class KeyValueCache[F[_]: Async, K, V] (ref: Ref[F, Map[K, CachedEntry[F, V]]], expiryF: F[Expiry]) {
type CacheMap = Map[K, CachedEntry[F, V]]
def cache (key: K)(value: F[V]): F[V] = retrieve(key, value, expiryF)
def refresh (key: K)(value: F[V]): F[V] = {
val forcedExpiry: F[Expiry] = expiryF.map { underlyingExpiry =>
new Expiry {
def isExpired (spec: CacheSpec, mode: ExpiryMode): Boolean = true
val timestamp: Long = underlyingExpiry.timestamp
}
}
retrieve(key, value, forcedExpiry)
}
private def retrieve (key: K, valueF: F[V], expiryF: F[Expiry]): F[V] = {
def readFromMap (map: CacheMap, expiry: Expiry): F[V] = map.get(key) match {
case Some(CachedEntry(deferred, spec)) => deferred.get.flatMap(extractOrUpdate(_, spec, expiry))
case None => update(expiry, CacheSpec.empty, None)
}
def extractOrUpdate (result: Result[V], spec: CacheSpec, expiry: Expiry): F[V] = result match {
case Right(value) if !expiry.isExpired(spec, ExpiryMode.Success) => Async[F].pure(value)
case Left(error) if !expiry.isExpired(spec, ExpiryMode.Failure) => Async[F].raiseError(error)
case _ => update(expiry, spec, result.toOption)
}
def update (expiry: Expiry, oldSpec: CacheSpec, oldValue: Option[V]): F[V] = Deferred.uncancelable[F, Result[V]].flatMap { deferred =>
def newValue (oldMap: CacheMap): (CacheMap, F[(Result[V], CacheSpec)]) = {
val newSpec = oldSpec.next(expiry)
val newMap = oldMap.updated(key, CachedEntry(deferred, newSpec))
val result: F[(Result[V], CacheSpec)] = valueF.attempt.flatMap { result =>
val newCacheEntry = (result, oldValue) match {
case (Left(_), Some(oldValid)) if !expiry.isExpired(oldSpec, ExpiryMode.Resilience) => (Right(oldValid), oldSpec.extend(expiry))
case (newResult, _) => (newResult, newSpec)
}
deferred.complete(newCacheEntry._1).as(newCacheEntry)
}
newMap -> result
}
ref
.modify { currentMap =>
currentMap.get(key) match {
case Some(CachedEntry(newDeferred, newSpec)) if newSpec != oldSpec =>
currentMap -> newDeferred.get.map((_, newSpec)) // concurrent modification
case _ =>
newValue(currentMap)
}
}
.flatten
.flatMap { case (result, spec) => extractOrUpdate(result, spec, expiry) }
}
for {
expiry <- expiryF
cacheMap <- ref.get
value <- readFromMap(cacheMap, expiry)
} yield value
}
}
class Cache[F[_], V] (cacheRef: KeyValueCache[F, SingleKey, V]) {
def cache (value: F[V]): F[V] = cacheRef.cache(SingleKey)(value)
def refresh (value: F[V]): F[V] = cacheRef.refresh(SingleKey)(value)
}
object Cache {
type Result[A] = Either[Throwable, A]
case class CachedEntry[F[_], A] (value: Deferred[F, Result[A]], spec: CacheSpec)
case class CacheSpec (timestamp: Long, version: Int, extended: Option[Long] = None) {
def next (expiry: Expiry): CacheSpec = CacheSpec(expiry.timestamp, version + 1)
def extend (expiry: Expiry): CacheSpec = copy(version = version + 1, extended = Some(expiry.timestamp))
}
object CacheSpec {
val empty: CacheSpec = CacheSpec(0,0)
}
sealed trait SingleKey
case object SingleKey extends SingleKey
sealed trait ExpiryMode
object ExpiryMode {
case object Failure extends ExpiryMode
case object Success extends ExpiryMode
case object Resilience extends ExpiryMode
}
sealed trait Expiry {
def isExpired (spec: CacheSpec, mode: ExpiryMode): Boolean
def timestamp: Long
}
object Expiry {
def never[F[_]: Monad]: F[Expiry] = Monad[F].pure {
new Expiry {
def isExpired (spec: CacheSpec, mode: ExpiryMode): Boolean = false
def timestamp: Long = 0
}
}
def after[F[_]: Monad] (standardTTL: Long,
errorTTL: Long,
resilienceTTL: Long,
clock: Clock[F]): F[Expiry] = {
val ttl: ExpiryMode => Long = {
case ExpiryMode.Success => standardTTL
case ExpiryMode.Failure => errorTTL
case ExpiryMode.Resilience => resilienceTTL
}
clock
.realTime(TimeUnit.MILLISECONDS)
.map { currentMillis =>
new Expiry {
def isExpired (spec: CacheSpec, mode: ExpiryMode): Boolean = (mode, spec.extended) match {
case (ExpiryMode.Success, Some(ts)) => (currentMillis - ts) > ttl(ExpiryMode.Failure) // retry after failure timeout
case _ => (currentMillis - spec.timestamp) > ttl(mode)
}
def timestamp: Long = currentMillis
}
}
}
}
def forKeyedValues[F[_]: Async, K, V]: F[KeyValueCache[F, K, V]] =
forKeyedValues(Expiry.never[F])
def forKeyedValues[F[_]: Async, K, V](standardTTL: FiniteDuration,
errorTTL: FiniteDuration,
resilienceTTL: FiniteDuration,
clock: Clock[F]): F[KeyValueCache[F, K, V]] =
forKeyedValues(Expiry.after(standardTTL.toMillis, errorTTL.toMillis, resilienceTTL.toMillis, clock))
private def forKeyedValues[F[_]: Async, K, V](expiry: F[Expiry]): F[KeyValueCache[F, K, V]] =
Ref.of[F, Map[K, CachedEntry[F, V]]](Map.empty).map { ref =>
new KeyValueCache[F, K, V](ref, expiry)
}
def forSingleValue[F[_]: Async, V]: F[Cache[F, V]] =
forSingleValue(Expiry.never[F])
def forSingleValue[F[_]: Async, V](standardTTL: FiniteDuration,
errorTTL: FiniteDuration,
resilienceTTL: FiniteDuration,
clock: Clock[F]): F[Cache[F, V]] =
forSingleValue(Expiry.after(standardTTL.toMillis, errorTTL.toMillis, resilienceTTL.toMillis, clock))
private def forSingleValue[F[_]: Async, V](expiry: F[Expiry]): F[Cache[F, V]] =
forKeyedValues[F, SingleKey, V](expiry).map(new Cache(_))
}
package com.kiwipowered.stakzcore.client
import cats.effect.IO
import org.scalatest.{FunSuite, Matchers}
import scala.concurrent.ExecutionContext
import scala.concurrent.duration._
class CacheSpec extends FunSuite with Matchers {
class Counter {
private val iterator: Iterator[Int] = Iterator.from(1)
val next: IO[Int] = IO(iterator.next)
}
class FlakyCounter {
private val iterator: Iterator[Int] = Iterator.from(1)
val next: IO[Int] = IO(iterator.next).flatMap {
case i if i % 2 == 0 => IO.raiseError(new RuntimeException("Evil even number encountered"))
case i => IO.pure(i)
}
}
private val timer = IO.timer(ExecutionContext.global)
test("caching a single value") {
val counter = new Counter
val res = for {
newCache <- Cache.forSingleValue[IO, Int](150.millis, 5.millis, 300.millis, timer.clock)
cachedValue = newCache.cache(counter.next)
v1 <- cachedValue
v2 <- cachedValue
_ <- timer.sleep(300.millis)
v3 <- cachedValue
v4 <- cachedValue
} yield (v1, v2, v3, v4)
res.unsafeRunSync() shouldBe (1,1,2,2)
}
test("caching key-value pairs") {
val fooCounter = new Counter
val barCounter = new Counter
val res2 = for {
newCache <- Cache.forKeyedValues[IO, String, Int](150.millis, 5.millis, 300.millis, timer.clock)
cachedFoo = newCache.cache("foo")(fooCounter.next)
cachedBar = newCache.cache("bar")(barCounter.next)
v1 <- cachedFoo
v2 <- cachedFoo
v3 <- cachedBar
v4 <- cachedBar
_ <- timer.sleep(300.millis)
v5 <- cachedFoo
v6 <- cachedFoo
v7 <- cachedBar
v8 <- cachedBar
} yield (v1, v2, v3, v4, v5, v6, v7, v8)
res2.unsafeRunSync() shouldBe (1,1,1,1,2,2,2,2)
}
test("caching key-value pairs with fresh IO instances") {
val fooCounter = (new Counter).next
val barCounter = (new Counter).next
val res2 = for {
newCache <- Cache.forKeyedValues[IO, String, Int](150.millis, 5.millis, 300.millis, timer.clock)
v1 <- newCache.cache("foo")(fooCounter)
v2 <- newCache.cache("foo")(fooCounter)
v3 <- newCache.cache("bar")(barCounter)
v4 <- newCache.cache("bar")(barCounter)
_ <- timer.sleep(300.millis)
v5 <- newCache.cache("foo")(fooCounter)
v6 <- newCache.cache("foo")(fooCounter)
v7 <- newCache.cache("bar")(barCounter)
v8 <- newCache.cache("bar")(barCounter)
} yield (v1, v2, v3, v4, v5, v6, v7, v8)
res2.unsafeRunSync() shouldBe (1,1,1,1,2,2,2,2)
}
test("caching a single value that occasionally fails") {
val counter = new FlakyCounter
val res = for {
newCache <- Cache.forSingleValue[IO, Int](150.millis, 50.millis, 200.millis, timer.clock)
cachedValue = newCache.cache(counter.next).handleErrorWith(_ => IO.pure(0))
v1 <- cachedValue
_ <- timer.sleep(300.millis)
v2 <- cachedValue
v3 <- cachedValue
_ <- timer.sleep(100.millis)
v4 <- cachedValue
v5 <- cachedValue
} yield (v1, v2, v3, v4, v5)
res.unsafeRunSync() shouldBe (1,0,0,3,3)
}
test("use resilience TTL for a value that fails") {
val counter = new FlakyCounter
val res = for {
newCache <- Cache.forSingleValue[IO, Int](100.millis, 50.millis, 200.millis, timer.clock)
cachedValue = newCache.cache(counter.next).handleErrorWith(_ => IO.pure(0))
v1 <- cachedValue
_ <- timer.sleep(150.millis)
v2 <- cachedValue
v3 <- cachedValue
_ <- timer.sleep(100.millis)
v4 <- cachedValue
v5 <- cachedValue
} yield (v1, v2, v3, v4, v5)
res.unsafeRunSync() shouldBe (1,1,1,3,3)
}
test("retry resilience cache after failure TTL") {
val counter = new FlakyCounter
val res = for {
newCache <- Cache.forSingleValue[IO, Int](100.millis, 20.millis, 500.millis, timer.clock)
cachedValue = newCache.cache(counter.next).handleErrorWith(_ => IO.pure(0))
v1 <- cachedValue
_ <- timer.sleep(150.millis)
v2 <- cachedValue
v3 <- cachedValue
_ <- timer.sleep(100.millis)
v4 <- cachedValue
v5 <- cachedValue
} yield (v1, v2, v3, v4, v5)
res.unsafeRunSync() shouldBe (1,1,1,3,3)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment