Skip to content

Instantly share code, notes, and snippets.

@candera
Last active February 25, 2020 14:02
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save candera/7553134 to your computer and use it in GitHub Desktop.
Save candera/7553134 to your computer and use it in GitHub Desktop.
dopar
(defn dopar
"Given a (potentially infinite) sequence `coll`, uses core.async to
run `f` for side effects against each value in the collection.
Performs at most `concur` operations in parallel, and never enqueues
more than `lead` items ahead of the ones being consumed. If any call
to `f` throws an exception, it will be rethrown from this function.
Otherwise, returns nil. Optional timeout value is number of
milliseconds to wait for all operations to complete."
([coll f concur lead] (dopar coll f concur lead nil))
([coll f concur lead timeout-ms]
(let [req (async/chan lead)
resp (async/chan concur)
timeout (when timeout-ms (async/timeout timeout-ms))
success (Object.)]
;; This loop creates `concur` go blocks, which may run in parallel
(dotimes [i concur]
(async/go-loop []
(when-let [val (async/<! req)]
(try
(f val)
(async/>! resp success)
(catch Throwable t
(async/>! resp t)
(async/close! req)
(async/close! resp)
;; Can't return nil, as the value of the body of
;; go-loop is delivered on the channel that's
;; implicitly created, and one can't put nil on a
;; channel. Might as well return the exception.
t))
(recur))))
;; This loop puts work onto the request queue, takes work off
;; the response queue, and does bookkeeping.
(loop [[head & more :as both] coll
enqueued 0
successes 0]
(log/warn head more)
(log/warn enqueued successes)
(when (or head (< successes enqueued))
(when-not head (async/close! req))
(let [ports (remove nil? [resp timeout (when head [req head])])
[val port] (async/alts!! ports)]
(cond
(= port req) (recur more (inc enqueued) successes)
(= port resp) (if (= val success)
(recur both enqueued (inc successes))
(do (async/close! req)
(async/close! resp)
(throw (ex-info "Unexpected response"
{:reason :unexpected-response
:response val}))))
(= port timeout) (do (async/close! req)
(async/close! resp)
(throw (ex-info "Operation timed out" {:reason :timed-out})))
:else (do (async/close! req)
(async/close! resp)
(throw (ex-info "Unexpected result from alts!!"
{:reason :wtf-alts
:port port}))))))))))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment