Skip to content

Instantly share code, notes, and snippets.

@joescii
Created July 14, 2016 17:54
Show Gist options
  • Save joescii/115b494d793ba33c12619fed6e034448 to your computer and use it in GitHub Desktop.
Save joescii/115b494d793ba33c12619fed6e034448 to your computer and use it in GitHub Desktop.
Modified reorderLocally for my purposes
def reorderLocally[I](f: I => Long, range:Long):Process1[I, I] = { // TODO: Can we replace Long with something like A: Order ?
import scala.collection.immutable.SortedMap
def emitMapValues(m: SortedMap[Long, Vector[I]]) =
emitAll(m.foldLeft(Vector.empty[I]) { case (acc, (_, tss)) => acc ++ tss })
def go(buffered: SortedMap[Long, Vector[I]]): Process1[I, I] = {
receive1Or[I, I](emitMapValues(buffered)) { t =>
val until = f(t) - range
val (toEmit, toBuffer) = buffered span { case (x, _) => x <= until }
val updatedBuffer = toBuffer + (f(t) -> (toBuffer.getOrElse(f(t), Vector.empty[I]) :+ t))
emitMapValues(toEmit) ++ go(updatedBuffer)
}
}
go(SortedMap.empty)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment