Skip to content

Instantly share code, notes, and snippets.

@ericnormand
Last active October 21, 2019 14:08
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ericnormand/2367d395cf5032c785a09ea3c7043f2c to your computer and use it in GitHub Desktop.
Save ericnormand/2367d395cf5032c785a09ea3c7043f2c to your computer and use it in GitHub Desktop.

parallel execution

If you need to run an action on all elements of a collection, you can use run!.

(run! println (range 100))

This will run all of the printlns in sequence. But there's no built-in function to run all of those functions in parallel. That's your task this week.

Write a function runp! that takes a function and a sequence of arguments. It will run the function on different threads and wait for them all to finish. It returns nil, just like run!.

(defn runp! [f coll]

Please be mindful of the number of threads it creates. For instance, if I create 4000 threads on my machine, it crashes the OS. The typical solution is to use a thread pool.

(defn runp!
"Runs the function `f` in parallel on the given collection `coll`.
It will use (by default) the same numbers of threads as there are cores available on the system.
You can set a specific number of threads using `:threads`."
([f coll]
(runp! f coll (.availableProcessors (Runtime/getRuntime))))
([f coll threads]
(let [pool (java.util.concurrent.Executors/newFixedThreadPool threads)
tasks (map (fn [e] (fn [] (f e))) coll)]
(try
(doseq [future (.invokeAll pool tasks)]
(.get future))
(finally
(.shutdown pool))))))
;; with timeout
(defn runp!
"Runs the function `f` in parallel on the given collection `coll`.
It will use (by default) the same numbers of threads as there are cores available on the system.
You can set a specific number of threads using `:threads` waits maximum `:timeout` milliseconds."
([f coll {:keys [threads timeout] :or {threads (.availableProcessors (Runtime/getRuntime))}}]
(let [pool (java.util.concurrent.Executors/newFixedThreadPool threads)
tasks (map (fn [e] (fn [] (f e))) coll)]
(try
(doseq [future (.invokeAll pool tasks)]
(.get future))
(catch InterruptedException e
(.shutdownNow pool))
(finally
(.shutdown pool)
(when timeout
(when-not (.awaitTermination pool timeout java.util.concurrent.TimeUnit/MILLISECONDS)
(.shutdownNow pool))))))))
(defn runp! [f coll]
(doall (pmap f coll))
nil)
(import '[java.util.concurrent Executors])
(defn runp! [f coll]
(let [pool (Executors/newCachedThreadPool)
tasks (map #(fn [] (f %)) coll)]
(.invokeAll pool tasks)
(.shutdown pool))
nil)
(defonce ^{:private true}
runp-pool (java.util.concurrent.Executors/newFixedThreadPool (.availableProcessors (Runtime/getRuntime))))
(defn runp! [f coll]
(->> coll
(map #(fn [] (f %)))
(.invokeAll runp-pool)
(run! #(.get %))))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment