Skip to content

Instantly share code, notes, and snippets.

@rrmckinley
Created October 10, 2013 21:39
Show Gist options
  • Save rrmckinley/6926069 to your computer and use it in GitHub Desktop.
Save rrmckinley/6926069 to your computer and use it in GitHub Desktop.
def yipWhenL[I,O,O2](ord: (I,O) => Ordering)(f: (I,Option[O]) => O2): Wye[I,O,O2] = {
val fbL = tee.passL[I] map (f(_,None : Option[O]))
def go(io : Option[I], oo : Option[O]): Wye[I,O,O2] = {
io match {
case None => awaitL[I].flatMap { i => go(Some(i), oo) }
case Some(i) => oo match {
case None => awaitR[O].flatMap(o => go(io,Some(o))) // TODO receiveROr(fbL)...
case Some(o) => ord(i,o) match {
case Ordering.LT => emit(f(i,None)) then go(None,oo)
case Ordering.EQ => emit(f(i,oo)) then go(None,None)
case Ordering.GT => go(io,None)
}
}
}
}
go(None,None)
}
@pchiusano
Copy link

fby is short for 'followedBy'. It's a semi-standard name in stream processing / dataflow programming. :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment