Skip to content

Instantly share code, notes, and snippets.

@TrustNoOne
Last active August 29, 2015 14:25
Show Gist options
  • Save TrustNoOne/c7087713fa78375b2cb0 to your computer and use it in GitHub Desktop.
Save TrustNoOne/c7087713fa78375b2cb0 to your computer and use it in GitHub Desktop.
PersistentView exposed as a Source[T, _]
/*
* Copyright (c) 2015 ReaQta Ltd
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files
* (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge,
* publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so,
* subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
* OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
* LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
* IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*
*/
package reaqta.utils.stream
import akka.actor._
import akka.persistence._
import akka.stream.actor.ActorPublisher
import akka.stream.actor.ActorPublisherMessage.{ Cancel, Request }
import akka.stream.scaladsl.Source
import reaqta.utils.stream.PersistentViewSource.{ UpdateCompleted, DoUpdate }
import scala.reflect.ClassTag
/**
* <p>
* Exposes a persistent view as a [[akka.stream.scaladsl.Source]].
* When materialized, the source emits persisted events as seen by a PersistenceView with the given
* persistenceId and viewId. It keeps emitting persisted events until cancelled.
* </p>
*
* <p>
* The first emitted item will be a [[akka.persistence.SnapshotOffer]] if the following applies:
* <ul>
* <li>persistenceId == viewId</li>
* <li>the persistent actor has a a snapshot</li>
* <li>the type parameter T is assignable from the class [[akka.persistence.SnapshotOffer]]</li>
* </ul>
* </p>
*
* {{{
* PersistentViewSource[Any](myPersistenceId, myViewId)
* .runWith(Sink.foreach(elem ⇒ println(s"Received persisted event: $elem")))
*
* //Only events of a certain type. E.g. with myPersistenceId == myViewId this could filter out the snapshots
* PersistentViewSource[SomeEventType](myPersistenceId, myViewId)
* .runWith(Sink.foreach(elem ⇒ println(s"Received persisted event: $elem")))
* }}}
*/
object PersistentViewSource {
/**
* Creates a PersistentViewSource for a [[akka.persistence.PersistentActor]].
*
* @param persistenceId PersistentView persistenceId. See [[akka.persistence.PersistentView]]
* @param viewId PersistentView viewId. See [[akka.persistence.PersistentView]]
* @tparam T Only events of this type are returned
* @return A Source of all the events as seen by a PersistenceView with the given persistenceId and viewId
*/
def apply[T: ClassTag](persistenceId: String, viewId: String): Source[T, ActorRef] = {
Source.actorPublisher[T](Props(new PersistentViewSource[T](persistenceId, viewId)))
}
private[PersistentViewSource] object DoUpdate
private[PersistentViewSource] object UpdateCompleted
}
class PersistentViewSource[T](override val persistenceId: String, override val viewId: String)(implicit tag: ClassTag[T]) extends ActorPublisher[T] with PersistentView {
import context.dispatcher
//disable auto update and recovery
override val autoUpdate = false
override val autoUpdateReplayMax = 0L
val emitsSnapshot = tag.runtimeClass.isAssignableFrom(classOf[SnapshotOffer])
var snapshot: Option[SnapshotOffer] = None
var scheduledUpdate: Option[Cancellable] = None
var updateRunning = false
// number of items received in the last batch of persistent events
var persistentUpdatesReceived = 0
override def receive: Receive = {
case ss: SnapshotOffer if emitsSnapshot ⇒ snapshot = Some(ss)
case elem: T if isPersistent ⇒ emitElement(elem)
case Request(n) if !updateRunning ⇒ requestUpdate(n)
case DoUpdate if !updateRunning ⇒ requestUpdate(totalDemand)
case UpdateCompleted ⇒ scheduleNextUpdate()
case Cancel ⇒ cancelAndClose()
}
/**
* Emits an element downstream. We ask for updates only if we have requests so this should respect backpressure
*/
def emitElement(elem: T) = {
persistentUpdatesReceived += 1
onNext(elem)
}
/**
* Schedules the next update after the previous one is completed. If the last update returned at least one element,
* the next one is scheduled immediately. If the last one was empty, the next one will happen after [[autoUpdateInterval]]
* or as soon as a new request comes in
*/
def scheduleNextUpdate() = {
updateRunning = false
if (totalDemand > 0 && persistentUpdatesReceived > 0) {
log.debug("Immediately requesting more elements from the journal because last batch had size > 0")
requestUpdate(totalDemand)
} else if (totalDemand > 0) {
log.debug("Nothing received in last batch of persistent events - Scheduling automatic update")
scheduledUpdate = Some(context.system.scheduler.schedule(autoUpdateInterval, autoUpdateInterval, self, DoUpdate))
}
}
/**
* Enqueues an Update with await == true so that UpdateCompleted will be received only after the update completes.
* More requests could be already enqueued and received before the Update message. We set updateRunning = true so
* that those requests are ignored and taken care of only after this update completes. The same applies to the less
* likely event of stashing a Request message received in between the Update and UpdateCompleted messages
*/
def requestUpdate(n: Long) = {
val toRequest = if (emitSnapshot()) n - 1 else n
log.debug("Requesting an update for {} items from the journal", toRequest)
updateRunning = true
persistentUpdatesReceived = 0
// cancel the automatic update (if scheduled), it will be created again after this update request completes
scheduledUpdate foreach (_.cancel())
scheduledUpdate = None
self ! Update(await = true, replayMax = toRequest)
self ! UpdateCompleted
}
def emitSnapshot(): Boolean = snapshot match {
case Some(snap) ⇒
snapshot.map(_.asInstanceOf[T]) foreach onNext
snapshot = None
true
case None ⇒ false
}
/**
* Cancels scheduled updates and stops the actor
*/
def cancelAndClose() = {
scheduledUpdate foreach (_.cancel())
self ! PoisonPill
}
override def postStop() = {
scheduledUpdate foreach (_.cancel())
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment