Last active
December 31, 2016 05:41
-
-
Save kutchar/0ffb0ce38ee464b7e8c43cb9bb749369 to your computer and use it in GitHub Desktop.
DelayWhen
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.util.Date | |
import java.util.concurrent.TimeUnit | |
import scala.concurrent.Future | |
import scala.concurrent.duration.Duration | |
import scala.concurrent.duration.FiniteDuration | |
import akka.NotUsed | |
import akka.actor.ActorSystem | |
import akka.stream.ActorMaterializer | |
import akka.stream.Attributes | |
import akka.stream.FlowShape | |
import akka.stream.Inlet | |
import akka.stream.Outlet | |
import akka.stream.scaladsl.Source | |
import akka.stream.stage.GraphStage | |
import akka.stream.stage.GraphStageLogic | |
import akka.stream.stage.InHandler | |
import akka.stream.stage.OutHandler | |
import akka.stream.stage.TimerGraphStageLogic | |
class DelayWhen[A](d: FiniteDuration, p: A => Boolean) extends GraphStage[FlowShape[A, A]] { | |
val in = Inlet[A]("DelayWhen.in") | |
val out = Outlet[A]("DelayWhen.out") | |
val shape = FlowShape.of(in, out) | |
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = | |
new TimerGraphStageLogic(shape) { | |
private[this] var waiting = false | |
setHandler(in, new InHandler { | |
override def onPush(): Unit = { | |
if (!waiting) { | |
val elem = grab(in) | |
if (p(elem)) { | |
waiting = true | |
scheduleOnce("DelayWhen", d) | |
} | |
push(out, elem) | |
} | |
} | |
}) | |
setHandler(out, new OutHandler { | |
override def onPull(): Unit = { | |
if (!waiting) { | |
pull(in) | |
} | |
} | |
}) | |
override protected def onTimer(key: Any): Unit = { | |
waiting = false | |
pull(in) | |
} | |
} | |
override def toString = "DelayWhen" | |
} | |
object DelayWhen extends App { | |
implicit val system = ActorSystem("test") | |
implicit val materializer = ActorMaterializer() | |
implicit val ec = system.dispatcher | |
var count = 0 | |
var index = 0 | |
def getItems = { | |
count += 1 | |
Thread.sleep(500) | |
if (count < 20) { | |
if (count % 5 == 0) { | |
Future.successful(Some(Nil)) | |
} else { | |
index += 1 | |
Future.successful(Some(Seq(new Date() + " " + index))) | |
} | |
} else { | |
Future.successful(None) | |
} | |
} | |
Source | |
.unfoldAsync(NotUsed) { _ => getItems map (_.map(items => NotUsed -> items)) } | |
.via(new DelayWhen(Duration(5, TimeUnit.SECONDS), _.isEmpty)) | |
.runForeach(t => println(new Date() + ": " + t)) | |
.andThen { case _ => system.terminate() } | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment