Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
(defn real-pmap-with-early-abort
"Like real-pmap, but as soon as one thread throws an exception, all threads
are interrupted, and the original exception is rethrown. This is particularly
helpful when you want to do some all-or-nothing work, and the failure of any
piece means the entire results are invalid--or if the crash of one thread
could deadlock other threads."
[f coll]
(let [exception (promise)
thread-group (ThreadGroup. "real-pmap-with-early-abort")
results (vec (take n (repeatedly promise)))
threads (mapv (fn build-thread [x result]
(Thread. thread-group
(bound-fn []
(try (deliver result (f x))
(catch Throwable t
; Note that we're not necessarily
; guaranteed to execute this code. If
; our call of (f x) throws, we could
; wind up here, and then another
; thread with a failure could
; interrupt us, causing us to jump
; out of this catch block.
(deliver exception t)
(.interrupt thread-group))))))
coll
results)]
; Launch threads
(doseq [t threads] (.start t))
; Wait for completion. Normally I'd await a barrier, but because of the way
; we interrupt threads, I don't think there's any point where we *could*
; join a barrier. What we *can* do reliably, though, is join the thread.
; That can throw InterruptedException, so we catch that, check that it's
; really dead, and if so, move on. If we get interrupted and the thread
; *isn't* dead, then it's probably that someone interrupted this
; real-pmap-with-early-abort call, rather than the underlying thread, and
; we interrupt the thread group (just in case), and rethrow our own
; interrupt.
(doseq [t threads]
(try
(.join t)
(catch InterruptedException e
(when (.isAlive t)
; We were interrupted by an outside force, not the thread we
; were joining. Clean up our thread group and rethrow.
(.interrupt thread-group)
(throw e)))))
; OK, all threads are now dead. If any observed an exception, throw that.
(when-let [ex @exception]
(throw ex))
; Otherwise, return results!
(mapv deref results)))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.