Skip to content

Instantly share code, notes, and snippets.

@easel
Created November 30, 2016 21:06
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save easel/171c73ee96cb122527f4897d58d0deea to your computer and use it in GitHub Desktop.
Save easel/171c73ee96cb122527f4897d58d0deea to your computer and use it in GitHub Desktop.
package com.theseventhsense.clients.wsclient
import akka.actor.{ Props, Status }
import akka.pattern.pipe
import akka.stream.actor.ActorPublisher
import com.theseventhsense.utils.persistence.Keyed
import scala.concurrent.ExecutionContext
class BatchSource[A <: Keyed](loader: BatchLoader[A])(implicit ec: ExecutionContext) extends ActorPublisher[A] {
import akka.stream.actor.ActorPublisherMessage._
final val BUFFER_AMOUNT = 1000
private var first = true
private var nextOffset: Option[String] = None
private var buffer: Seq[A] = Seq.empty
def receive: Receive = waitingForDownstreamReq(0)
case object Pull
private def shouldLoadMore = {
nextOffset.isDefined && (totalDemand > 0 || buffer.length < BUFFER_AMOUNT)
}
def waitingForDownstreamReq(offset: Long): Receive = {
case Request(_) | Pull =>
val sent = if (buffer.nonEmpty) {
sendFromBuff(totalDemand)
} else {
0
}
if (first || (shouldLoadMore && isActive)) {
first = false
loader.load(nextOffset).pipeTo(self)
context.become(waitingForFut(offset + sent, totalDemand))
}
case Cancel => context.stop(self)
}
def sendFromBuff(demand: Long): Long = {
val consumed = buffer.take(demand.toInt).toList
buffer = buffer.drop(consumed.length)
consumed.foreach(onNext)
if (nextOffset.isEmpty && buffer.isEmpty) {
onComplete()
}
consumed.length.toLong
}
def waitingForFut(s: Long, beforeFutDemand: Long): Receive = {
case batch: Batch[A] =>
nextOffset = if (batch.items.isEmpty) {
None
} else {
batch.nextOffset
}
buffer = buffer ++ batch.items
val consumed = sendFromBuff(beforeFutDemand)
self ! Pull
context.become(waitingForDownstreamReq(s + consumed))
case Request(_) | Pull => // ignoring until we receive the future response
case Status.Failure(err) =>
context.become(waitingForDownstreamReq(s))
onError(err)
case Cancel => context.stop(self)
}
}
object BatchSource {
def props[T <: Keyed](loader: BatchLoader[T])(implicit ec: ExecutionContext): Props = {
Props(new BatchSource[T](loader))
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment