Last active
August 29, 2015 14:25
-
-
Save TrustNoOne/c7087713fa78375b2cb0 to your computer and use it in GitHub Desktop.
PersistentView exposed as a Source[T, _]
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
* Copyright (c) 2015 ReaQta Ltd | |
* | |
* Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files | |
* (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, | |
* publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, | |
* subject to the following conditions: | |
* | |
* The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. | |
* | |
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES | |
* OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE | |
* LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR | |
* IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. | |
* | |
*/ | |
package reaqta.utils.stream | |
import akka.actor._ | |
import akka.persistence._ | |
import akka.stream.actor.ActorPublisher | |
import akka.stream.actor.ActorPublisherMessage.{ Cancel, Request } | |
import akka.stream.scaladsl.Source | |
import reaqta.utils.stream.PersistentViewSource.{ UpdateCompleted, DoUpdate } | |
import scala.reflect.ClassTag | |
/** | |
* <p> | |
* Exposes a persistent view as a [[akka.stream.scaladsl.Source]]. | |
* When materialized, the source emits persisted events as seen by a PersistenceView with the given | |
* persistenceId and viewId. It keeps emitting persisted events until cancelled. | |
* </p> | |
* | |
* <p> | |
* The first emitted item will be a [[akka.persistence.SnapshotOffer]] if the following applies: | |
* <ul> | |
* <li>persistenceId == viewId</li> | |
* <li>the persistent actor has a a snapshot</li> | |
* <li>the type parameter T is assignable from the class [[akka.persistence.SnapshotOffer]]</li> | |
* </ul> | |
* </p> | |
* | |
* {{{ | |
* PersistentViewSource[Any](myPersistenceId, myViewId) | |
* .runWith(Sink.foreach(elem ⇒ println(s"Received persisted event: $elem"))) | |
* | |
* //Only events of a certain type. E.g. with myPersistenceId == myViewId this could filter out the snapshots | |
* PersistentViewSource[SomeEventType](myPersistenceId, myViewId) | |
* .runWith(Sink.foreach(elem ⇒ println(s"Received persisted event: $elem"))) | |
* }}} | |
*/ | |
object PersistentViewSource { | |
/** | |
* Creates a PersistentViewSource for a [[akka.persistence.PersistentActor]]. | |
* | |
* @param persistenceId PersistentView persistenceId. See [[akka.persistence.PersistentView]] | |
* @param viewId PersistentView viewId. See [[akka.persistence.PersistentView]] | |
* @tparam T Only events of this type are returned | |
* @return A Source of all the events as seen by a PersistenceView with the given persistenceId and viewId | |
*/ | |
def apply[T: ClassTag](persistenceId: String, viewId: String): Source[T, ActorRef] = { | |
Source.actorPublisher[T](Props(new PersistentViewSource[T](persistenceId, viewId))) | |
} | |
private[PersistentViewSource] object DoUpdate | |
private[PersistentViewSource] object UpdateCompleted | |
} | |
class PersistentViewSource[T](override val persistenceId: String, override val viewId: String)(implicit tag: ClassTag[T]) extends ActorPublisher[T] with PersistentView { | |
import context.dispatcher | |
//disable auto update and recovery | |
override val autoUpdate = false | |
override val autoUpdateReplayMax = 0L | |
val emitsSnapshot = tag.runtimeClass.isAssignableFrom(classOf[SnapshotOffer]) | |
var snapshot: Option[SnapshotOffer] = None | |
var scheduledUpdate: Option[Cancellable] = None | |
var updateRunning = false | |
// number of items received in the last batch of persistent events | |
var persistentUpdatesReceived = 0 | |
override def receive: Receive = { | |
case ss: SnapshotOffer if emitsSnapshot ⇒ snapshot = Some(ss) | |
case elem: T if isPersistent ⇒ emitElement(elem) | |
case Request(n) if !updateRunning ⇒ requestUpdate(n) | |
case DoUpdate if !updateRunning ⇒ requestUpdate(totalDemand) | |
case UpdateCompleted ⇒ scheduleNextUpdate() | |
case Cancel ⇒ cancelAndClose() | |
} | |
/** | |
* Emits an element downstream. We ask for updates only if we have requests so this should respect backpressure | |
*/ | |
def emitElement(elem: T) = { | |
persistentUpdatesReceived += 1 | |
onNext(elem) | |
} | |
/** | |
* Schedules the next update after the previous one is completed. If the last update returned at least one element, | |
* the next one is scheduled immediately. If the last one was empty, the next one will happen after [[autoUpdateInterval]] | |
* or as soon as a new request comes in | |
*/ | |
def scheduleNextUpdate() = { | |
updateRunning = false | |
if (totalDemand > 0 && persistentUpdatesReceived > 0) { | |
log.debug("Immediately requesting more elements from the journal because last batch had size > 0") | |
requestUpdate(totalDemand) | |
} else if (totalDemand > 0) { | |
log.debug("Nothing received in last batch of persistent events - Scheduling automatic update") | |
scheduledUpdate = Some(context.system.scheduler.schedule(autoUpdateInterval, autoUpdateInterval, self, DoUpdate)) | |
} | |
} | |
/** | |
* Enqueues an Update with await == true so that UpdateCompleted will be received only after the update completes. | |
* More requests could be already enqueued and received before the Update message. We set updateRunning = true so | |
* that those requests are ignored and taken care of only after this update completes. The same applies to the less | |
* likely event of stashing a Request message received in between the Update and UpdateCompleted messages | |
*/ | |
def requestUpdate(n: Long) = { | |
val toRequest = if (emitSnapshot()) n - 1 else n | |
log.debug("Requesting an update for {} items from the journal", toRequest) | |
updateRunning = true | |
persistentUpdatesReceived = 0 | |
// cancel the automatic update (if scheduled), it will be created again after this update request completes | |
scheduledUpdate foreach (_.cancel()) | |
scheduledUpdate = None | |
self ! Update(await = true, replayMax = toRequest) | |
self ! UpdateCompleted | |
} | |
def emitSnapshot(): Boolean = snapshot match { | |
case Some(snap) ⇒ | |
snapshot.map(_.asInstanceOf[T]) foreach onNext | |
snapshot = None | |
true | |
case None ⇒ false | |
} | |
/** | |
* Cancels scheduled updates and stops the actor | |
*/ | |
def cancelAndClose() = { | |
scheduledUpdate foreach (_.cancel()) | |
self ! PoisonPill | |
} | |
override def postStop() = { | |
scheduledUpdate foreach (_.cancel()) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment