Skip to content

Instantly share code, notes, and snippets.

@kutchar
Last active December 31, 2016 05:41
Show Gist options
  • Save kutchar/0ffb0ce38ee464b7e8c43cb9bb749369 to your computer and use it in GitHub Desktop.
Save kutchar/0ffb0ce38ee464b7e8c43cb9bb749369 to your computer and use it in GitHub Desktop.
DelayWhen
import java.util.Date
import java.util.concurrent.TimeUnit
import scala.concurrent.Future
import scala.concurrent.duration.Duration
import scala.concurrent.duration.FiniteDuration
import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.Attributes
import akka.stream.FlowShape
import akka.stream.Inlet
import akka.stream.Outlet
import akka.stream.scaladsl.Source
import akka.stream.stage.GraphStage
import akka.stream.stage.GraphStageLogic
import akka.stream.stage.InHandler
import akka.stream.stage.OutHandler
import akka.stream.stage.TimerGraphStageLogic
class DelayWhen[A](d: FiniteDuration, p: A => Boolean) extends GraphStage[FlowShape[A, A]] {
val in = Inlet[A]("DelayWhen.in")
val out = Outlet[A]("DelayWhen.out")
val shape = FlowShape.of(in, out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new TimerGraphStageLogic(shape) {
private[this] var waiting = false
setHandler(in, new InHandler {
override def onPush(): Unit = {
if (!waiting) {
val elem = grab(in)
if (p(elem)) {
waiting = true
scheduleOnce("DelayWhen", d)
}
push(out, elem)
}
}
})
setHandler(out, new OutHandler {
override def onPull(): Unit = {
if (!waiting) {
pull(in)
}
}
})
override protected def onTimer(key: Any): Unit = {
waiting = false
pull(in)
}
}
override def toString = "DelayWhen"
}
object DelayWhen extends App {
implicit val system = ActorSystem("test")
implicit val materializer = ActorMaterializer()
implicit val ec = system.dispatcher
var count = 0
var index = 0
def getItems = {
count += 1
Thread.sleep(500)
if (count < 20) {
if (count % 5 == 0) {
Future.successful(Some(Nil))
} else {
index += 1
Future.successful(Some(Seq(new Date() + " " + index)))
}
} else {
Future.successful(None)
}
}
Source
.unfoldAsync(NotUsed) { _ => getItems map (_.map(items => NotUsed -> items)) }
.via(new DelayWhen(Duration(5, TimeUnit.SECONDS), _.isEmpty))
.runForeach(t => println(new Date() + ": " + t))
.andThen { case _ => system.terminate() }
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment