Last active
September 30, 2019 17:04
-
-
Save jenshalm/702122b0a9ddadf9e8b1f37638817489 to your computer and use it in GitHub Desktop.
Example for caching with expiry in a referentially transparent way using cats-effect
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
* Copyright 2019 - Jens Halm | |
* | |
* Licensed under the Apache License, Version 2.0 (the "License"); | |
* you may not use this file except in compliance with the License. | |
* You may obtain a copy of the License at | |
* | |
* http://www.apache.org/licenses/LICENSE-2.0 | |
* | |
* Unless required by applicable law or agreed to in writing, software | |
* distributed under the License is distributed on an "AS IS" BASIS, | |
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
* See the License for the specific language governing permissions and | |
* limitations under the License. | |
*/ | |
package com.kiwipowered.stakzcore.client | |
import java.util.concurrent.TimeUnit | |
import cats.Monad | |
import cats.effect.concurrent.{Deferred, Ref} | |
import cats.effect.{Async, Clock} | |
import cats.implicits._ | |
import Cache._ | |
import scala.concurrent.duration.FiniteDuration | |
class KeyValueCache[F[_]: Async, K, V] (ref: Ref[F, Map[K, CachedEntry[F, V]]], expiryF: F[Expiry]) { | |
type CacheMap = Map[K, CachedEntry[F, V]] | |
def cache (key: K)(value: F[V]): F[V] = retrieve(key, value, expiryF) | |
def refresh (key: K)(value: F[V]): F[V] = { | |
val forcedExpiry: F[Expiry] = expiryF.map { underlyingExpiry => | |
new Expiry { | |
def isExpired (spec: CacheSpec, mode: ExpiryMode): Boolean = true | |
val timestamp: Long = underlyingExpiry.timestamp | |
} | |
} | |
retrieve(key, value, forcedExpiry) | |
} | |
private def retrieve (key: K, valueF: F[V], expiryF: F[Expiry]): F[V] = { | |
def readFromMap (map: CacheMap, expiry: Expiry): F[V] = map.get(key) match { | |
case Some(CachedEntry(deferred, spec)) => deferred.get.flatMap(extractOrUpdate(_, spec, expiry)) | |
case None => update(expiry, CacheSpec.empty, None) | |
} | |
def extractOrUpdate (result: Result[V], spec: CacheSpec, expiry: Expiry): F[V] = result match { | |
case Right(value) if !expiry.isExpired(spec, ExpiryMode.Success) => Async[F].pure(value) | |
case Left(error) if !expiry.isExpired(spec, ExpiryMode.Failure) => Async[F].raiseError(error) | |
case _ => update(expiry, spec, result.toOption) | |
} | |
def update (expiry: Expiry, oldSpec: CacheSpec, oldValue: Option[V]): F[V] = Deferred.uncancelable[F, Result[V]].flatMap { deferred => | |
def newValue (oldMap: CacheMap): (CacheMap, F[(Result[V], CacheSpec)]) = { | |
val newSpec = oldSpec.next(expiry) | |
val newMap = oldMap.updated(key, CachedEntry(deferred, newSpec)) | |
val result: F[(Result[V], CacheSpec)] = valueF.attempt.flatMap { result => | |
val newCacheEntry = (result, oldValue) match { | |
case (Left(_), Some(oldValid)) if !expiry.isExpired(oldSpec, ExpiryMode.Resilience) => (Right(oldValid), oldSpec.extend(expiry)) | |
case (newResult, _) => (newResult, newSpec) | |
} | |
deferred.complete(newCacheEntry._1).as(newCacheEntry) | |
} | |
newMap -> result | |
} | |
ref | |
.modify { currentMap => | |
currentMap.get(key) match { | |
case Some(CachedEntry(newDeferred, newSpec)) if newSpec != oldSpec => | |
currentMap -> newDeferred.get.map((_, newSpec)) // concurrent modification | |
case _ => | |
newValue(currentMap) | |
} | |
} | |
.flatten | |
.flatMap { case (result, spec) => extractOrUpdate(result, spec, expiry) } | |
} | |
for { | |
expiry <- expiryF | |
cacheMap <- ref.get | |
value <- readFromMap(cacheMap, expiry) | |
} yield value | |
} | |
} | |
class Cache[F[_], V] (cacheRef: KeyValueCache[F, SingleKey, V]) { | |
def cache (value: F[V]): F[V] = cacheRef.cache(SingleKey)(value) | |
def refresh (value: F[V]): F[V] = cacheRef.refresh(SingleKey)(value) | |
} | |
object Cache { | |
type Result[A] = Either[Throwable, A] | |
case class CachedEntry[F[_], A] (value: Deferred[F, Result[A]], spec: CacheSpec) | |
case class CacheSpec (timestamp: Long, version: Int, extended: Option[Long] = None) { | |
def next (expiry: Expiry): CacheSpec = CacheSpec(expiry.timestamp, version + 1) | |
def extend (expiry: Expiry): CacheSpec = copy(version = version + 1, extended = Some(expiry.timestamp)) | |
} | |
object CacheSpec { | |
val empty: CacheSpec = CacheSpec(0,0) | |
} | |
sealed trait SingleKey | |
case object SingleKey extends SingleKey | |
sealed trait ExpiryMode | |
object ExpiryMode { | |
case object Failure extends ExpiryMode | |
case object Success extends ExpiryMode | |
case object Resilience extends ExpiryMode | |
} | |
sealed trait Expiry { | |
def isExpired (spec: CacheSpec, mode: ExpiryMode): Boolean | |
def timestamp: Long | |
} | |
object Expiry { | |
def never[F[_]: Monad]: F[Expiry] = Monad[F].pure { | |
new Expiry { | |
def isExpired (spec: CacheSpec, mode: ExpiryMode): Boolean = false | |
def timestamp: Long = 0 | |
} | |
} | |
def after[F[_]: Monad] (standardTTL: Long, | |
errorTTL: Long, | |
resilienceTTL: Long, | |
clock: Clock[F]): F[Expiry] = { | |
val ttl: ExpiryMode => Long = { | |
case ExpiryMode.Success => standardTTL | |
case ExpiryMode.Failure => errorTTL | |
case ExpiryMode.Resilience => resilienceTTL | |
} | |
clock | |
.realTime(TimeUnit.MILLISECONDS) | |
.map { currentMillis => | |
new Expiry { | |
def isExpired (spec: CacheSpec, mode: ExpiryMode): Boolean = (mode, spec.extended) match { | |
case (ExpiryMode.Success, Some(ts)) => (currentMillis - ts) > ttl(ExpiryMode.Failure) // retry after failure timeout | |
case _ => (currentMillis - spec.timestamp) > ttl(mode) | |
} | |
def timestamp: Long = currentMillis | |
} | |
} | |
} | |
} | |
def forKeyedValues[F[_]: Async, K, V]: F[KeyValueCache[F, K, V]] = | |
forKeyedValues(Expiry.never[F]) | |
def forKeyedValues[F[_]: Async, K, V](standardTTL: FiniteDuration, | |
errorTTL: FiniteDuration, | |
resilienceTTL: FiniteDuration, | |
clock: Clock[F]): F[KeyValueCache[F, K, V]] = | |
forKeyedValues(Expiry.after(standardTTL.toMillis, errorTTL.toMillis, resilienceTTL.toMillis, clock)) | |
private def forKeyedValues[F[_]: Async, K, V](expiry: F[Expiry]): F[KeyValueCache[F, K, V]] = | |
Ref.of[F, Map[K, CachedEntry[F, V]]](Map.empty).map { ref => | |
new KeyValueCache[F, K, V](ref, expiry) | |
} | |
def forSingleValue[F[_]: Async, V]: F[Cache[F, V]] = | |
forSingleValue(Expiry.never[F]) | |
def forSingleValue[F[_]: Async, V](standardTTL: FiniteDuration, | |
errorTTL: FiniteDuration, | |
resilienceTTL: FiniteDuration, | |
clock: Clock[F]): F[Cache[F, V]] = | |
forSingleValue(Expiry.after(standardTTL.toMillis, errorTTL.toMillis, resilienceTTL.toMillis, clock)) | |
private def forSingleValue[F[_]: Async, V](expiry: F[Expiry]): F[Cache[F, V]] = | |
forKeyedValues[F, SingleKey, V](expiry).map(new Cache(_)) | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.kiwipowered.stakzcore.client | |
import cats.effect.IO | |
import org.scalatest.{FunSuite, Matchers} | |
import scala.concurrent.ExecutionContext | |
import scala.concurrent.duration._ | |
class CacheSpec extends FunSuite with Matchers { | |
class Counter { | |
private val iterator: Iterator[Int] = Iterator.from(1) | |
val next: IO[Int] = IO(iterator.next) | |
} | |
class FlakyCounter { | |
private val iterator: Iterator[Int] = Iterator.from(1) | |
val next: IO[Int] = IO(iterator.next).flatMap { | |
case i if i % 2 == 0 => IO.raiseError(new RuntimeException("Evil even number encountered")) | |
case i => IO.pure(i) | |
} | |
} | |
private val timer = IO.timer(ExecutionContext.global) | |
test("caching a single value") { | |
val counter = new Counter | |
val res = for { | |
newCache <- Cache.forSingleValue[IO, Int](150.millis, 5.millis, 300.millis, timer.clock) | |
cachedValue = newCache.cache(counter.next) | |
v1 <- cachedValue | |
v2 <- cachedValue | |
_ <- timer.sleep(300.millis) | |
v3 <- cachedValue | |
v4 <- cachedValue | |
} yield (v1, v2, v3, v4) | |
res.unsafeRunSync() shouldBe (1,1,2,2) | |
} | |
test("caching key-value pairs") { | |
val fooCounter = new Counter | |
val barCounter = new Counter | |
val res2 = for { | |
newCache <- Cache.forKeyedValues[IO, String, Int](150.millis, 5.millis, 300.millis, timer.clock) | |
cachedFoo = newCache.cache("foo")(fooCounter.next) | |
cachedBar = newCache.cache("bar")(barCounter.next) | |
v1 <- cachedFoo | |
v2 <- cachedFoo | |
v3 <- cachedBar | |
v4 <- cachedBar | |
_ <- timer.sleep(300.millis) | |
v5 <- cachedFoo | |
v6 <- cachedFoo | |
v7 <- cachedBar | |
v8 <- cachedBar | |
} yield (v1, v2, v3, v4, v5, v6, v7, v8) | |
res2.unsafeRunSync() shouldBe (1,1,1,1,2,2,2,2) | |
} | |
test("caching key-value pairs with fresh IO instances") { | |
val fooCounter = (new Counter).next | |
val barCounter = (new Counter).next | |
val res2 = for { | |
newCache <- Cache.forKeyedValues[IO, String, Int](150.millis, 5.millis, 300.millis, timer.clock) | |
v1 <- newCache.cache("foo")(fooCounter) | |
v2 <- newCache.cache("foo")(fooCounter) | |
v3 <- newCache.cache("bar")(barCounter) | |
v4 <- newCache.cache("bar")(barCounter) | |
_ <- timer.sleep(300.millis) | |
v5 <- newCache.cache("foo")(fooCounter) | |
v6 <- newCache.cache("foo")(fooCounter) | |
v7 <- newCache.cache("bar")(barCounter) | |
v8 <- newCache.cache("bar")(barCounter) | |
} yield (v1, v2, v3, v4, v5, v6, v7, v8) | |
res2.unsafeRunSync() shouldBe (1,1,1,1,2,2,2,2) | |
} | |
test("caching a single value that occasionally fails") { | |
val counter = new FlakyCounter | |
val res = for { | |
newCache <- Cache.forSingleValue[IO, Int](150.millis, 50.millis, 200.millis, timer.clock) | |
cachedValue = newCache.cache(counter.next).handleErrorWith(_ => IO.pure(0)) | |
v1 <- cachedValue | |
_ <- timer.sleep(300.millis) | |
v2 <- cachedValue | |
v3 <- cachedValue | |
_ <- timer.sleep(100.millis) | |
v4 <- cachedValue | |
v5 <- cachedValue | |
} yield (v1, v2, v3, v4, v5) | |
res.unsafeRunSync() shouldBe (1,0,0,3,3) | |
} | |
test("use resilience TTL for a value that fails") { | |
val counter = new FlakyCounter | |
val res = for { | |
newCache <- Cache.forSingleValue[IO, Int](100.millis, 50.millis, 200.millis, timer.clock) | |
cachedValue = newCache.cache(counter.next).handleErrorWith(_ => IO.pure(0)) | |
v1 <- cachedValue | |
_ <- timer.sleep(150.millis) | |
v2 <- cachedValue | |
v3 <- cachedValue | |
_ <- timer.sleep(100.millis) | |
v4 <- cachedValue | |
v5 <- cachedValue | |
} yield (v1, v2, v3, v4, v5) | |
res.unsafeRunSync() shouldBe (1,1,1,3,3) | |
} | |
test("retry resilience cache after failure TTL") { | |
val counter = new FlakyCounter | |
val res = for { | |
newCache <- Cache.forSingleValue[IO, Int](100.millis, 20.millis, 500.millis, timer.clock) | |
cachedValue = newCache.cache(counter.next).handleErrorWith(_ => IO.pure(0)) | |
v1 <- cachedValue | |
_ <- timer.sleep(150.millis) | |
v2 <- cachedValue | |
v3 <- cachedValue | |
_ <- timer.sleep(100.millis) | |
v4 <- cachedValue | |
v5 <- cachedValue | |
} yield (v1, v2, v3, v4, v5) | |
res.unsafeRunSync() shouldBe (1,1,1,3,3) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment