Skip to content

Instantly share code, notes, and snippets.

@robinp
Created September 12, 2012 08:47
Show Gist options
  • Save robinp/3705334 to your computer and use it in GitHub Desktop.
Save robinp/3705334 to your computer and use it in GitHub Desktop.
scala running merge of streams
object merging {
// Note: Tail-recursive stream functions should always be defined on objects, not traits.
// see http://stackoverflow.com/questions/12486762/scala-tail-recursive-stream-processor-function-defined-in-trait-holds-reference
/**
* Merges two ordered streams. The elements can be of different
* types, but they need a mapping to a common type C.
*
* The mapped streams must be ordered ascending.
*
* The mapped streams should contain distinct values.
*
* @param as first stream to merge
* @param bs second stream to merge
* @param f maps elements of the first stream to an ordering key
* @param g maps elements of the second stream to an ordering key
* @return the merged stream where pairs constitue elements with equal key.
* unpaired elements are dropped.
*/
@tailrec
final def merge[A, B, C](as: Stream[A], bs: Stream[B])(f: A => C, g: B => C)(
implicit C: Ordering[C]): Stream[(A, B)] =
(as, bs) match {
case (a #:: atails, b #:: btails) =>
val keyA = f(a)
val keyB = g(b)
if (C.equiv(keyA, keyB)) {
val atailRef = new AtomicReference(atails)
val btailRef = new AtomicReference(btails)
// tail calls end here, return a result
(a, b) #:: mergeLazy(atailRef, btailRef)(f, g)
}
else if (C.lt(keyA, keyB)) merge(atails, bs)(f, g)
else merge(as, btails)(f, g)
case _ /** Some of the streams is empty */ => Stream.empty
}
/**
* Needed to make the compiler believe the tailcallness of merge.
* Care is taken to avoid holding a reference to the stream heads (see inline comments).
*/
final def mergeLazy[A, B, C](asRef: AtomicReference[Stream[A]], bsRef: AtomicReference[Stream[B]])(f: A => C, g: B => C)(
implicit C: Ordering[C]) = {
// Passes the stream head references to the merge function and immediately deletes then from the holder,
// so that the mergeLazy stack frame doesn't keep them referred and they can be GC'd while the call is running.
//
// Note: don't put the unwrapped streams into local variables, obviously.
merge(asRef.getAndSet(null), bsRef.getAndSet(null))(f, g)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment