Skip to content

Instantly share code, notes, and snippets.

@kryptt
Created September 28, 2021 10:04
Show Gist options
  • Save kryptt/e20e861df414a643503112b408abde38 to your computer and use it in GitHub Desktop.
Save kryptt/e20e861df414a643503112b408abde38 to your computer and use it in GitHub Desktop.
Caching pipes
package kobo
package assist
import scalacache.{Cache, Mode}
import fs2.{INothing, Pipe, Stream}
import fs2.concurrent._
import cats.syntax.all._
import cats.data.{EitherT, OptionT}
import scala.concurrent.duration._
import cats.effect.{Concurrent, ExitCase}
final case class EmptyFallbackCache(keyParts: Seq[Any])
extends RuntimeException(s"Fallback cache was empty for $keyParts")
object Pipes {
/** Dual caching strategy where the longTermCache is used in case of errors */
def backingCache[F[_]: Mode: Concurrent, T](keyParts: Any*)(
shortTermCache: Cache[Vector[T]],
shortTermDuration: Option[Duration],
fallbackCache: Cache[Vector[T]],
fallbackDuration: Option[Duration],
): Pipe[F, T, T] = {
val cacheKey = keyParts :+ "shortTerm"
val fallbackKey = keyParts :+ "longTerm"
fetch =>
Stream.force(
OptionT(shortTermCache.get(cacheKey: _*)).cataF(
Queue.noneTerminated[F, T].map { queue =>
broadcastPut[F, T](
fetch,
queue,
put(cacheKey: _*)(shortTermCache, shortTermDuration),
put(fallbackKey: _*)(fallbackCache, fallbackDuration),
)
.handleErrorWith(_ =>
Stream.evalSeq(
EitherT
.fromOptionF(
fallbackCache.get(fallbackKey: _*),
EmptyFallbackCache(fallbackKey),
)
.rethrowT
)
)
},
Stream.emits(_).covary[F].pure,
)
)
}
/** Similar to Cache.cachingF, except the stream output uninterrupted. This can lead to competing
* writes and racing between values.
*/
def caching[F[_]: Mode: Concurrent, T](
keyParts: Any*
)(cache: Cache[Vector[T]], duration: Option[Duration]): Pipe[F, T, T] =
fetch =>
Stream.force(
OptionT(cache.get(keyParts: _*)).cataF(
Queue
.noneTerminated[F, T]
.map(broadcastPut(fetch, _, put(keyParts: _*)(cache, duration))),
Stream.emits(_).covary[F].pure,
)
)
def broadcastPut[F[_]: Concurrent, T](
fetch: Stream[F, T],
queue: NoneTerminatedQueue[F, T],
pipes: Pipe[F, T, T]*
): Stream[F, T] =
fetch
.evalTap(t => queue.enqueue1(Some(t)))
.onFinalize(queue.enqueue1(None))
.onFinalizeCase {
case ExitCase.Completed if pipes.size === 1 =>
queue.dequeue.through(pipes(0)).compile.drain
case ExitCase.Completed =>
queue.dequeue.broadcastThrough(pipes: _*).compile.drain
case _ => Concurrent[F].unit
}
/** Pipe all output to the supplied cache as a Vector[T] */
def put[F[_]: Mode: Concurrent, T](
keyParts: Any*
)(cache: Cache[Vector[T]], duration: Option[Duration]): Pipe[F, T, INothing] =
s =>
Stream.force(
s.compile
.toVector
.flatMap(cache.put(keyParts: _*)(_, duration))
.as(Stream.empty[F])
)
}
package kobo
package assist
import org.specs2.Specification
import cats.effect.testing.specs2.CatsIO
import scalacache.caffeine.CaffeineCache
import scalacache.CatsEffect.modes._
import cats.syntax.all._
import cats.effect.IO
import fs2.Stream
class PipesSpeq extends Specification with CatsIO {
override def is =
s2"""
puts to cache $putsCache
caching values $caching
caching skips calls $cachingSkips
caching does not store partial $cachingImpartial
backingCache skips calls $backingSkips
backingCache uses fallback $backingUsesFallback
backingCache sends intermediate output $backingEmits
backingCache writes to both caches on sucess $backingWritesBoth
"""
val cache = CaffeineCache[Vector[Int]]
def putsCache =
Stream(1, 2, 3)
.covary[IO]
.through(Pipes.put("putsTest")(cache, None))
.compile
.drain
.productR(cache.get[IO]("putsTest"))
.map(_ must beSome(beEqualTo(Vector(1, 2, 3))))
def caching =
Stream(6, 7)
.covary[IO]
.through(Pipes.caching("cachingTest")(cache, None))
.compile
.toVector
.product(cache.get[IO]("cachingTest"))
.map { case (s, c) => c must beSome(beEqualTo(s)) }
def cachingSkips =
cache
.put[IO]("cachingSkips")(Vector(1, 2), None)
.productR(
Stream
.raiseError[IO](new RuntimeException())
.through(Pipes.caching("cachingSkips")(cache, None))
.compile
.toVector
)
.map(_ must beEqualTo(Vector(1, 2)))
def cachingImpartial =
Stream(1, 2, 3)
.append(Stream.raiseError[IO](new RuntimeException()))
.through(Pipes.caching("cachingImpartial")(cache, None))
.compile
.toVector
.attempt
.product(cache.get[IO]("cachingImpartial"))
.map { case (s, c) => (s must beLeft).and(c must beNone) }
def backingSkips =
cache
.put[IO]("backingSkips", "shortTerm")(Vector(1, 2, 4), None)
.productR(
Stream
.raiseError[IO](new RuntimeException())
.through(Pipes.backingCache("backingSkips")(cache, None, cache, None))
.compile
.toVector
)
.map(_ must beEqualTo(Vector(1, 2, 4)))
def backingWritesBoth =
Stream(1, 3)
.covary[IO]
.through(Pipes.backingCache("backingWritesBoth")(cache, None, cache, None))
.compile
.toVector
.product(cache.get[IO]("backingWritesBoth", "shortTerm"))
.product(cache.get[IO]("backingWritesBoth", "longTerm"))
.map {
case ((s, cS), cL) =>
println(s"s: $s, cS: $cS, cL: $cL")
(cS must beSome(beEqualTo(s))).and(cL must beSome(beEqualTo(s)))
}
def backingUsesFallback =
cache
.put[IO]("backingUsesFallback", "longTerm")(Vector(5, 6), None)
.productR(
Stream
.raiseError[IO](new RuntimeException())
.through(Pipes.backingCache("backingUsesFallback")(cache, None, cache, None))
.compile
.toVector
.product(cache.get[IO]("backingUsesFallback", "shortTerm"))
.product(cache.get[IO]("backingUsesFallback", "longTerm"))
.map {
case ((s, cS), cL) =>
(cL must beSome(beEqualTo(s))).and(cS must beNone)
}
)
def backingEmits = {
val err = new RuntimeException()
Stream(1, 2)
.append(Stream.raiseError[IO](err))
.append(Stream(3, 4))
.through(Pipes.caching("backingEmits")(cache, None))
.attempt
.compile
.toVector
.product(cache.get[IO]("backingEmits", "longTerm"))
.map {
case (s, c) => (s must beEqualTo(Vector(Right(1), Right(2), Left(err)))).and(c must beNone)
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment