Skip to content

Instantly share code, notes, and snippets.

@aphyr
Created December 10, 2018 14:57
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save aphyr/61085facf46d87b568e0396196688834 to your computer and use it in GitHub Desktop.
Save aphyr/61085facf46d87b568e0396196688834 to your computer and use it in GitHub Desktop.
(defn real-pmap-with-early-abort
"Like real-pmap, but as soon as one thread throws an exception, all threads
are interrupted, and the original exception is rethrown. This is particularly
helpful when you want to do some all-or-nothing work, and the failure of any
piece means the entire results are invalid--or if the crash of one thread
could deadlock other threads."
[f coll]
(let [exception (promise)
thread-group (ThreadGroup. "real-pmap-with-early-abort")
results (vec (take n (repeatedly promise)))
threads (mapv (fn build-thread [x result]
(Thread. thread-group
(bound-fn []
(try (deliver result (f x))
(catch Throwable t
; Note that we're not necessarily
; guaranteed to execute this code. If
; our call of (f x) throws, we could
; wind up here, and then another
; thread with a failure could
; interrupt us, causing us to jump
; out of this catch block.
(deliver exception t)
(.interrupt thread-group))))))
coll
results)]
; Launch threads
(doseq [t threads] (.start t))
; Wait for completion. Normally I'd await a barrier, but because of the way
; we interrupt threads, I don't think there's any point where we *could*
; join a barrier. What we *can* do reliably, though, is join the thread.
; That can throw InterruptedException, so we catch that, check that it's
; really dead, and if so, move on. If we get interrupted and the thread
; *isn't* dead, then it's probably that someone interrupted this
; real-pmap-with-early-abort call, rather than the underlying thread, and
; we interrupt the thread group (just in case), and rethrow our own
; interrupt.
(doseq [t threads]
(try
(.join t)
(catch InterruptedException e
(when (.isAlive t)
; We were interrupted by an outside force, not the thread we
; were joining. Clean up our thread group and rethrow.
(.interrupt thread-group)
(throw e)))))
; OK, all threads are now dead. If any observed an exception, throw that.
(when-let [ex @exception]
(throw ex))
; Otherwise, return results!
(mapv deref results)))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment