Created
January 18, 2016 07:01
-
-
Save hochgi/740ece39908e5155c627 to your computer and use it in GitHub Desktop.
Aggregation stage for akka-streams to serve as a "smart buffer", a kind of hybrid for conflate and buffer stages
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
object Aggregate { | |
private[this] val inc: Any ⇒ Long = _⇒1L | |
def apply[In,Out](max: Long, seed: In ⇒ Out)(aggregate: (Out, In) ⇒ Out) = | |
AggregateWeighted[In,Out](max, inc, seed)(aggregate) | |
} | |
object AggregateWeighted { | |
def apply[In,Out](max: Long, costFn: In ⇒ Long, seed: In ⇒ Out)(aggregate: (Out, In) ⇒ Out) = | |
new AggregateWeighted(max, costFn, seed, aggregate) | |
} | |
class AggregateWeighted[In,Out] private(max: Long, costFn: In ⇒ Long, seed: In ⇒ Out, aggregate: (Out, In) ⇒ Out) extends DetachedStage[In, Out]{ | |
private var agg: Any = null | |
private var left: Long = max | |
private var pending: Any = null | |
private[this] def flush(ctx: DetachedContext[Out]) = { | |
val result = agg.asInstanceOf[Out] | |
agg = null | |
left = max | |
if(pending != null) { | |
val elem = pending.asInstanceOf[In] | |
agg = seed(elem) | |
left -= costFn(elem) | |
pending = null | |
} | |
ctx.pushAndPull(result) | |
} | |
override def onPush(elem: In, ctx: DetachedContext[Out]): UpstreamDirective = { | |
val cost = costFn(elem) | |
if (agg == null) { | |
left -= cost | |
agg = seed(elem) | |
} | |
else if (left <= 0 || left - cost < 0) { | |
pending = elem | |
} | |
else { | |
left -= cost | |
agg = aggregate(agg.asInstanceOf[Out], elem) | |
} | |
if (!ctx.isHoldingDownstream && pending == null) ctx.pull() | |
else if(!ctx.isHoldingDownstream) ctx.holdUpstream() | |
else flush(ctx) | |
} | |
override def onPull(ctx: DetachedContext[Out]): DownstreamDirective = { | |
if (ctx.isFinishing) { | |
if (agg == null) ctx.finish() | |
else { | |
val result = agg.asInstanceOf[Out] | |
left = max | |
if (pending == null) { | |
agg = null | |
ctx.pushAndFinish(result) | |
} | |
else { | |
val elem = pending.asInstanceOf[In] | |
agg = seed(elem) | |
left -= costFn(elem) | |
pending = null | |
ctx.push(result) | |
} | |
} | |
} | |
else if(ctx.isHoldingBoth) flush(ctx) | |
else if (agg == null) ctx.holdDownstream() | |
else { | |
val result = agg.asInstanceOf[Out] | |
left = max | |
if(pending != null) { | |
val elem = pending.asInstanceOf[In] | |
agg = seed(elem) | |
left -= costFn(elem) | |
pending = null | |
} | |
else { | |
agg = null | |
} | |
ctx.push(result) | |
} | |
} | |
override def onUpstreamFinish(ctx: DetachedContext[Out]): TerminationDirective = ctx.absorbTermination() | |
override def restart(): AggregateWeighted[In, Out] = { | |
agg = null | |
left = max | |
pending = null | |
this | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment