Created
December 10, 2018 14:57
-
-
Save aphyr/61085facf46d87b568e0396196688834 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(defn real-pmap-with-early-abort | |
"Like real-pmap, but as soon as one thread throws an exception, all threads | |
are interrupted, and the original exception is rethrown. This is particularly | |
helpful when you want to do some all-or-nothing work, and the failure of any | |
piece means the entire results are invalid--or if the crash of one thread | |
could deadlock other threads." | |
[f coll] | |
(let [exception (promise) | |
thread-group (ThreadGroup. "real-pmap-with-early-abort") | |
results (vec (take n (repeatedly promise))) | |
threads (mapv (fn build-thread [x result] | |
(Thread. thread-group | |
(bound-fn [] | |
(try (deliver result (f x)) | |
(catch Throwable t | |
; Note that we're not necessarily | |
; guaranteed to execute this code. If | |
; our call of (f x) throws, we could | |
; wind up here, and then another | |
; thread with a failure could | |
; interrupt us, causing us to jump | |
; out of this catch block. | |
(deliver exception t) | |
(.interrupt thread-group)))))) | |
coll | |
results)] | |
; Launch threads | |
(doseq [t threads] (.start t)) | |
; Wait for completion. Normally I'd await a barrier, but because of the way | |
; we interrupt threads, I don't think there's any point where we *could* | |
; join a barrier. What we *can* do reliably, though, is join the thread. | |
; That can throw InterruptedException, so we catch that, check that it's | |
; really dead, and if so, move on. If we get interrupted and the thread | |
; *isn't* dead, then it's probably that someone interrupted this | |
; real-pmap-with-early-abort call, rather than the underlying thread, and | |
; we interrupt the thread group (just in case), and rethrow our own | |
; interrupt. | |
(doseq [t threads] | |
(try | |
(.join t) | |
(catch InterruptedException e | |
(when (.isAlive t) | |
; We were interrupted by an outside force, not the thread we | |
; were joining. Clean up our thread group and rethrow. | |
(.interrupt thread-group) | |
(throw e))))) | |
; OK, all threads are now dead. If any observed an exception, throw that. | |
(when-let [ex @exception] | |
(throw ex)) | |
; Otherwise, return results! | |
(mapv deref results))) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment