Skip to content

Instantly share code, notes, and snippets.

@hochgi
Created January 18, 2016 07:01
Show Gist options
  • Save hochgi/740ece39908e5155c627 to your computer and use it in GitHub Desktop.
Save hochgi/740ece39908e5155c627 to your computer and use it in GitHub Desktop.
Aggregation stage for akka-streams to serve as a "smart buffer", a kind of hybrid for conflate and buffer stages
object Aggregate {
private[this] val inc: Any ⇒ Long = _⇒1L
def apply[In,Out](max: Long, seed: In ⇒ Out)(aggregate: (Out, In) ⇒ Out) =
AggregateWeighted[In,Out](max, inc, seed)(aggregate)
}
object AggregateWeighted {
def apply[In,Out](max: Long, costFn: In ⇒ Long, seed: In ⇒ Out)(aggregate: (Out, In) ⇒ Out) =
new AggregateWeighted(max, costFn, seed, aggregate)
}
class AggregateWeighted[In,Out] private(max: Long, costFn: In ⇒ Long, seed: In ⇒ Out, aggregate: (Out, In) ⇒ Out) extends DetachedStage[In, Out]{
private var agg: Any = null
private var left: Long = max
private var pending: Any = null
private[this] def flush(ctx: DetachedContext[Out]) = {
val result = agg.asInstanceOf[Out]
agg = null
left = max
if(pending != null) {
val elem = pending.asInstanceOf[In]
agg = seed(elem)
left -= costFn(elem)
pending = null
}
ctx.pushAndPull(result)
}
override def onPush(elem: In, ctx: DetachedContext[Out]): UpstreamDirective = {
val cost = costFn(elem)
if (agg == null) {
left -= cost
agg = seed(elem)
}
else if (left <= 0 || left - cost < 0) {
pending = elem
}
else {
left -= cost
agg = aggregate(agg.asInstanceOf[Out], elem)
}
if (!ctx.isHoldingDownstream && pending == null) ctx.pull()
else if(!ctx.isHoldingDownstream) ctx.holdUpstream()
else flush(ctx)
}
override def onPull(ctx: DetachedContext[Out]): DownstreamDirective = {
if (ctx.isFinishing) {
if (agg == null) ctx.finish()
else {
val result = agg.asInstanceOf[Out]
left = max
if (pending == null) {
agg = null
ctx.pushAndFinish(result)
}
else {
val elem = pending.asInstanceOf[In]
agg = seed(elem)
left -= costFn(elem)
pending = null
ctx.push(result)
}
}
}
else if(ctx.isHoldingBoth) flush(ctx)
else if (agg == null) ctx.holdDownstream()
else {
val result = agg.asInstanceOf[Out]
left = max
if(pending != null) {
val elem = pending.asInstanceOf[In]
agg = seed(elem)
left -= costFn(elem)
pending = null
}
else {
agg = null
}
ctx.push(result)
}
}
override def onUpstreamFinish(ctx: DetachedContext[Out]): TerminationDirective = ctx.absorbTermination()
override def restart(): AggregateWeighted[In, Out] = {
agg = null
left = max
pending = null
this
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment