Skip to content

Instantly share code, notes, and snippets.

@LMnet
Last active October 9, 2017 07:55
Show Gist options
  • Save LMnet/2d1b2a19ec686aa059bd3b4f03518092 to your computer and use it in GitHub Desktop.
Save LMnet/2d1b2a19ec686aa059bd3b4f03518092 to your computer and use it in GitHub Desktop.
CronSource.scala
package ru.dgis.casino.sharpy
import java.util.Date
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.{AtomicBoolean, AtomicReference}
import akka.actor.Cancellable
import akka.stream.scaladsl.Source
import akka.stream.stage.{AsyncCallback, GraphStageLogic, GraphStageWithMaterializedValue, OutHandler, StageLogging, TimerGraphStageLogic}
import akka.stream.{Attributes, Outlet, SourceShape}
import org.quartz.CronExpression
import scala.concurrent.duration.FiniteDuration
/**
* Based on `TickSource`
*/
object CronSource {
def apply[T](cronExpression: CronExpression, tick: T): Source[T, Cancellable] = {
Source.fromGraph(new CronSource(cronExpression, tick))
}
}
class CronSource[T](cronExpression: CronExpression, tick: T)
extends GraphStageWithMaterializedValue[SourceShape[T], Cancellable] {
val out: Outlet[T] = Outlet("CronSource")
override val shape = SourceShape(out)
override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Cancellable) = {
val logic = new TimerGraphStageLogic(shape) with StageLogging with Cancellable {
val cancelled = new AtomicBoolean(false)
val cancelCallback: AtomicReference[Option[AsyncCallback[Unit]]] = new AtomicReference(None)
val isTickUndelivered = new AtomicBoolean(false)
override def preStart(): Unit = {
super.preStart()
cancelCallback.set(Some(getAsyncCallback[Unit](_ ⇒ completeStage())))
if (cancelled.get) completeStage()
else scheduleNext()
}
setHandler(out, new OutHandler {
override def onPull(): Unit = {
if (isTickUndelivered.getAndSet(false)) {
push(out, tick)
}
}
})
override protected def onTimer(timerKey: Any): Unit = {
if (!isClosed(out) && !isCancelled) {
if (isAvailable(out)) {
push(out, tick)
isTickUndelivered.set(false)
} else {
isTickUndelivered.set(true)
}
scheduleNext()
}
}
def scheduleNext(): Unit = {
val currentDate = new Date
val nextDate = cronExpression.getNextValidTimeAfter(currentDate)
val delay = FiniteDuration(nextDate.getTime - currentDate.getTime, TimeUnit.MILLISECONDS)
log.debug(s"Scheduling $tick at ${nextDate.toInstant}")
scheduleOnce(tick, delay)
}
override def cancel(): Boolean = {
val success = !cancelled.getAndSet(true)
if (success) cancelCallback.get.foreach(_.invoke(()))
success
}
override def isCancelled: Boolean = cancelled.get
}
(logic, logic)
}
}
package ru.dgis.casino.sharpy
import akka.actor.{ActorSystem, Cancellable}
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Sink
import akka.stream.testkit.TestSubscriber
import akka.testkit.TestKit
import org.quartz.CronExpression
import org.scalatest.FreeSpecLike
import ru.dgis.util.ScalatestTestKit
import scala.concurrent.duration._
/**
* Based on `akka.stream.scaladsl.TickSourceSpec`
*/
class CronSourceTest extends ScalatestTestKit with FreeSpecLike {
implicit val actorSystem = ActorSystem()
implicit val materializer = ActorMaterializer()
def prepareTest(): (TestSubscriber.ManualProbe[String], Cancellable) = {
val probe = TestSubscriber.manualProbe[String]()
val cronExpression = new CronExpression("* * * * * ? *")
val cancellable = CronSource(cronExpression, "tick").to(Sink.fromSubscriber(probe)).run()
(probe, cancellable)
}
"CronSource should" - {
"produce ticks" in {
val (probe, _) = prepareTest()
val sub = probe.expectSubscription()
sub.request(2)
probe.expectNext(1.second, "tick")
probe.expectNoMsg(900.millis)
probe.expectNext(200.millis, "tick")
sub.cancel()
probe.expectNoMsg(1.second)
}
"buffer ticks when not requested" in {
val (probe, _) = prepareTest()
val sub = probe.expectSubscription()
sub.request(1)
probe.expectNext("tick")
probe.expectNoMsg(1100.millis)
sub.request(1)
probe.expectNext(100.millis, "tick")
sub.cancel()
probe.expectNoMsg(1.second)
}
"be possible to cancel" in {
val (probe, cancellable) = prepareTest()
val sub = probe.expectSubscription()
sub.request(2)
probe.expectNext("tick")
probe.expectNoMsg(900.millis)
probe.expectNext(200.millis, "tick")
cancellable.cancel()
TestKit.awaitCond(cancellable.isCancelled, 200.millis)
sub.request(3)
probe.expectComplete()
}
"acknowledge cancellation only once" in {
val (probe, cancellable) = prepareTest()
val sub = probe.expectSubscription()
sub.request(2)
probe.expectNext("tick")
assert(cancellable.cancel() == true)
assert(cancellable.cancel() == false)
probe.expectComplete()
}
"have isCancelled mirror the cancellation state" in {
val (probe, cancellable) = prepareTest()
val sub = probe.expectSubscription()
sub.request(2)
probe.expectNext("tick")
assert(cancellable.isCancelled == false)
assert(cancellable.cancel() == true)
assert(cancellable.isCancelled == true)
probe.expectComplete()
}
"support being cancelled immediately after its materialization" in {
val (probe, cancellable) = prepareTest()
assert(cancellable.cancel() == true)
val sub = probe.expectSubscription()
sub.request(2)
probe.expectComplete()
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment