Skip to content

Instantly share code, notes, and snippets.

@rkuhn
Created November 18, 2015 16:09
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save rkuhn/e3c7e5ca71906519e3f7 to your computer and use it in GitHub Desktop.
Save rkuhn/e3c7e5ca71906519e3f7 to your computer and use it in GitHub Desktop.
MergeSorted
class MergeSorted[T: Ordering] extends GraphStage[FanInShape2[T, T, T]] {
private val left = Inlet[T]("left")
private val right = Inlet[T]("right")
private val out = Outlet[T]("out")
override val shape = new FanInShape2(left, right, out)
override def createLogic(attr: Attributes) = new GraphStageLogic(shape) {
import Ordering.Implicits._
setHandler(left, ignoreTerminateInput)
setHandler(right, ignoreTerminateInput)
setHandler(out, eagerTerminateOutput)
var other: T = _
def nullOut(): Unit = other = null.asInstanceOf[T]
def dispatch(l: T, r: T): Unit =
if (l < r) { other = r; emit(out, l, readL) }
else { other = l; emit(out, r, readR) }
val dispatchR = dispatch(other, _: T)
val dispatchL = dispatch(_: T, other)
val passR = () => emit(out, other, () => { nullOut(); passAlong(right, out, doPull = true) })
val passL = () => emit(out, other, () => { nullOut(); passAlong(left, out, doPull = true) })
val readR = () => read(right)(dispatchR, passL)
val readL = () => read(left)(dispatchL, passR)
override def preStart(): Unit =
read(left)(l => {
other = l
readR()
}, () => passAlong(right, out, doPull = true))
}
}
diff --git a/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala b/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala
index 7d62676..01a7516 100644
--- a/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala
+++ b/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala
@@ -428,12 +428,19 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount:
* for the given inlet if suspension is needed and reinstalls the current
* handler upon receiving the last `onPush()` signal (before invoking the `andThen` function).
*/
- final protected def readN[T](in: Inlet[T], n: Int)(andThen: Seq[T] ⇒ Unit): Unit =
+ final protected def readN[T](in: Inlet[T], n: Int)(andThen: Seq[T] ⇒ Unit, onClose: Seq[T] => Unit): Unit =
if (n < 0) throw new IllegalArgumentException("cannot read negative number of elements")
else if (n == 0) andThen(Nil)
else {
val result = new ArrayBuffer[T](n)
var pos = 0
+ def realAndThen = (elem: T) => {
+ result(pos) = elem
+ pos += 1
+ if (pos == n) andThen(result)
+ }
+ def realOnClose = () => onClose(result.take(pos))
+
if (isAvailable(in)) {
val elem = grab(in)
result(0) = elem
@@ -443,20 +450,12 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount:
pos = 1
requireNotReading(in)
pull(in)
- setHandler(in, new Reading(in, n - 1, getHandler(in))(elem ⇒ {
- result(pos) = elem
- pos += 1
- if (pos == n) andThen(result)
- }))
+ setHandler(in, new Reading(in, n - 1, getHandler(in))(realAndThen, realOnClose))
}
} else {
requireNotReading(in)
if (!hasBeenPulled(in)) pull(in)
- setHandler(in, new Reading(in, n, getHandler(in))(elem ⇒ {
- result(pos) = elem
- pos += 1
- if (pos == n) andThen(result)
- }))
+ setHandler(in, new Reading(in, n, getHandler(in))(realAndThen, realOnClose))
}
}
@@ -466,14 +465,16 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount:
* for the given inlet if suspension is needed and reinstalls the current
* handler upon receiving the `onPush()` signal (before invoking the `andThen` function).
*/
- final protected def read[T](in: Inlet[T])(andThen: T ⇒ Unit): Unit = {
+ final protected def read[T](in: Inlet[T])(andThen: T ⇒ Unit, onClose: () => Unit): Unit = {
if (isAvailable(in)) {
val elem = grab(in)
andThen(elem)
+ } else if (isClosed(in)) {
+ onClose()
} else {
requireNotReading(in)
if (!hasBeenPulled(in)) pull(in)
- setHandler(in, new Reading(in, 1, getHandler(in))(andThen))
+ setHandler(in, new Reading(in, 1, getHandler(in))(andThen, onClose))
}
}
@@ -497,7 +498,7 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount:
* Caution: for n==1 andThen is called after resetting the handler, for
* other values it is called without resetting the handler.
*/
- private class Reading[T](in: Inlet[T], private var n: Int, val previous: InHandler)(andThen: T ⇒ Unit) extends InHandler {
+ private class Reading[T](in: Inlet[T], private var n: Int, val previous: InHandler)(andThen: T ⇒ Unit, onClose: () => Unit) extends InHandler {
override def onPush(): Unit = {
val elem = grab(in)
if (n == 1) setHandler(in, previous)
@@ -507,8 +508,15 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount:
}
andThen(elem)
}
- override def onUpstreamFinish(): Unit = previous.onUpstreamFinish()
- override def onUpstreamFailure(ex: Throwable): Unit = previous.onUpstreamFailure(ex)
+ override def onUpstreamFinish(): Unit = {
+ setHandler(in, previous)
+ onClose()
+ previous.onUpstreamFinish()
+ }
+ override def onUpstreamFailure(ex: Throwable): Unit = {
+ setHandler(in, previous)
+ previous.onUpstreamFailure(ex)
+ }
}
/**
@@ -648,7 +656,7 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount:
}
private class EmittingSingle[T](_out: Outlet[T], elem: T, _previous: OutHandler, _andThen: () ⇒ Unit)
- extends Emitting(_out, _previous, _andThen) {
+ extends Emitting(_out, _previous, _andThen) {
override def onPull(): Unit = {
push(out, elem)
@@ -657,7 +665,7 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount:
}
private class EmittingIterator[T](_out: Outlet[T], elems: Iterator[T], _previous: OutHandler, _andThen: () ⇒ Unit)
- extends Emitting(_out, _previous, _andThen) {
+ extends Emitting(_out, _previous, _andThen) {
override def onPull(): Unit = {
push(out, elems.next())
@@ -676,7 +684,9 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount:
* given outlet before pulling for more data. `doTerminate` controls whether
* completion or failure of the given inlet shall lead to stage termination or not.
*/
- final protected def passAlong[Out, In <: Out](from: Inlet[In], to: Outlet[Out], doFinish: Boolean, doFail: Boolean): Unit =
+ final protected def passAlong[Out, In <: Out](from: Inlet[In], to: Outlet[Out],
+ doFinish: Boolean = true, doFail: Boolean = true,
+ doPull: Boolean = false): Unit = {
setHandler(from, new InHandler {
val puller = () ⇒ tryPull(from)
override def onPush(): Unit = {
@@ -686,6 +696,8 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount:
override def onUpstreamFinish(): Unit = if (doFinish) super.onUpstreamFinish()
override def onUpstreamFailure(ex: Throwable): Unit = if (doFail) super.onUpstreamFailure(ex)
})
+ if (doPull) tryPull(from)
+ }
/**
* Obtain a callback object that can be used asynchronously to re-enter the
@rkuhn
Copy link
Author

rkuhn commented Nov 18, 2015

The patch is a minor touch-up over 2.0-M1 that allows read to register an onClose callback.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment