Created
November 21, 2016 18:15
-
-
Save edmundnoble/690167883e7ea2d1837894ab042e937e to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def raceFold[A, B](ob: Seq[Task[A]])(z: B)(fold: (B, A) => B): Observable[B] = { | |
Observable.unsafeCreate { subscriber => | |
// We need a monitor to synchronize on, per evaluation! | |
val lock = new AnyRef | |
val conn = StackedCancelable() | |
// Forces a fork on another (logical) thread! | |
subscriber.scheduler.executeAsync { () => | |
val _ = lock.synchronized { | |
// Keeps track of tasks remaining to be completed. | |
// Is initialized by 1 because of the logic - tasks can run synchronously, | |
// and we decrement this value whenever one finishes, so we must prevent | |
// zero values before the loop is done. | |
// MUST BE synchronized by `lock`! | |
var remaining = 1 | |
// If this variable is false, then a task ended in error. | |
// MUST BE synchronized by `lock`! | |
var isActive = true | |
var cur: B = z | |
subscriber.onNext(z) | |
// MUST BE synchronized by `lock`! | |
// MUST NOT BE called if isActive == false! | |
@inline def maybeSignalFinal(conn: StackedCancelable) | |
(implicit s: Scheduler): Unit = { | |
remaining -= 1 | |
if (remaining == 0) { | |
subscriber.onComplete() | |
isActive = false | |
val _ = conn.pop() | |
} | |
} | |
implicit val s = subscriber.scheduler | |
// Represents the collection of cancelables for all started tasks | |
val composite = CompositeCancelable() | |
conn.push(composite) | |
// Collecting all cancelables in a buffer, because adding | |
// cancelables one by one in our `CompositeCancelable` is | |
// expensive, so we do it at the end | |
val allCancelables = ListBuffer.empty[StackedCancelable] | |
val cursor = ob.toIterator | |
// The `isActive` check short-circuits the process in case | |
// we have a synchronous task that just completed in error | |
while (cursor.hasNext && isActive) { | |
remaining += 1 | |
val task = cursor.next() | |
val stacked = StackedCancelable() | |
allCancelables += stacked | |
// Light asynchronous boundary; with most scheduler implementations | |
// it will not fork a new (logical) thread! | |
subscriber.scheduler.executeTrampolined(() => | |
Task.unsafeStartNow(task, Task.Context(subscriber.scheduler, stacked, monix.execution.misc.ThreadLocal(1), Task.Options(false)), | |
new Callback[A] { | |
def onSuccess(value: A): Unit = | |
lock.synchronized { | |
if (isActive) { | |
val newValue = fold(cur, value) | |
cur = newValue | |
subscriber.onNext(newValue) | |
maybeSignalFinal(conn) | |
} | |
} | |
def onError(ex: Throwable): Unit = | |
lock.synchronized { | |
if (isActive) { | |
isActive = false | |
// This should cancel our CompositeCancelable | |
conn.pop().cancel() | |
subscriber.onError(ex) | |
} else { | |
subscriber.scheduler.reportFailure(ex) | |
} | |
} | |
})) | |
} | |
// All tasks could have executed synchronously, so we might be | |
// finished already. If so, then trigger the final callback. | |
maybeSignalFinal(conn) | |
// Note that if an error happened, this should cancel all | |
// other active tasks. | |
composite ++= allCancelables | |
} | |
} | |
conn | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment