Skip to content

Instantly share code, notes, and snippets.

@lancearlaus
Last active August 29, 2015 14:20
Show Gist options
  • Save lancearlaus/0693970574ccec656d94 to your computer and use it in GitHub Desktop.
Save lancearlaus/0693970574ccec656d94 to your computer and use it in GitHub Desktop.
Akka streams PushStage that enforces element ordering
/**
* Enforces ordering on a stream, supporting ascending/descending and strict/non-strict options.
*
* This push stage accepts elements of type `T` and emits elements of type `Either[T, T]`.
* Emitted elements will be of type `Right` if properly ordered or of type `Left` otherwise.
* The stream of Right elements is guaranteed to have proper ordering semantics.
*
* For example, given ascending strict ordering, this stage will emit the following sequence of elements for the given stream.
* Note that the second C element is rejected since ordering only considers valid elements, not invalid intermediate elements.
*
* In:
* {{{ A, B, C, B, C, D }}}
*
* Out:
* {{{ Right(A) Right(B), Right(C), Left(B), Left(C), Right(D) }}}
*
* @param ascending
* @param strict
* @tparam T
*/
class Ordered[T](ascending: Boolean = true, strict: Boolean = true)(implicit ordering: Ordering[T]) extends PushStage[T, Either[T, T]] {
var last: Option[T] = None
def isOrdered(last: T, cur: T) =
if (ascending)
if (strict) ordering.lt(last, cur)
else ordering.lteq(last, cur)
else
if (strict) ordering.gt(last, cur)
else ordering.gteq(last, cur)
override def onPush(elem: T, ctx: Context[Either[T, T]]): SyncDirective = {
val result = last.fold[Either[T, T]](Right(elem))(last => Either.cond(isOrdered(last, elem), elem, elem))
if (result.isRight) last = Some(elem)
ctx.push(result)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment