Skip to content

Instantly share code, notes, and snippets.

@quelgar
Last active February 10, 2019 01:16
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save quelgar/d13d313e50bd1acd3b9bb9f687d8931a to your computer and use it in GitHub Desktop.
Save quelgar/d13d313e50bd1acd3b9bb9f687d8931a to your computer and use it in GitHub Desktop.
**NOTE**: I think this turned out to be pretty buggy. A custom Akka graph stage that caches the latest value for a period of time.
import java.time.{Clock, Instant}
import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.{Attributes, FlowShape, Inlet, Outlet, _}
import akka.stream.stage.{GraphStage, GraphStageLogic, InHandler, OutHandler, _}
import scaladsl.{Flow, _}
import scala.concurrent.Await
import scala.concurrent.duration._
/**
* A custom Akka graph stage that caches the latest value for a period of time.
*
* The upstream is only asked to produce a value if: there is demand from downstream and,
* the cache duration has expired. If the cache duration has not yet expired, all downstream pull
* requests are replied to with the cached value.
*
* The stage initially starts without a value, but in an expired state. This means the first pull
* from downstream will result in a pull to the upstream.
*
* This is useful when you always want to be able to immediately provide a value in response to
* downstream demand, but the source may be expensive to pull from, and you know the values emitted
* by the source change infrequently.
*/
final class CacheStage[A](cacheFor: FiniteDuration, clock: Clock) extends GraphStage[FlowShape[A, A]] {
val in = Inlet[A]("CacheStage.in")
val out = Outlet[A]("CacheStage.out")
override def shape: FlowShape[A, A] = FlowShape.of(in, out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
private var currentValue: A = _
private var expires = Instant.MIN
setHandlers(in, out, new InHandler with OutHandler {
override def onPush(): Unit = {
currentValue = grab(in)
expires = clock.instant() plusMillis cacheFor.toMillis
push(out, currentValue)
}
override def onPull(): Unit = {
if (clock.instant() isAfter expires) {
pull(in)
} else {
push(out, currentValue)
}
}
})
}
}
object CacheStage {
def apply[A](cacheFor: FiniteDuration, clock: Clock): Flow[A, A, NotUsed] = {
Flow.fromGraph(new CacheStage(cacheFor, clock))
}
}
object CacheStageTest {
def main(args: Array[String]): Unit = {
val src = Source(1 to 5)
implicit val system = ActorSystem("CacheStageTest")
implicit val materializer = ActorMaterializer()
val clock = Clock.systemDefaultZone()
// demand a value every second, cache each value for 5 seconds
val flow = src.via(CacheStage(5 second span, clock))
.delay(1 second span, DelayOverflowStrategy.backpressure)
.withAttributes(Attributes.inputBuffer(1, 1))
Await.result(flow.runForeach(i => println(f"${clock.instant().toEpochMilli}%TT : $i")), 1 minute span)
println(Await.result(system.terminate(), 1 minute span))
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment