Skip to content

Instantly share code, notes, and snippets.

@enforser
Last active December 6, 2018 19:14
Show Gist options
  • Save enforser/80f8b3bce641078255fd27ff895fc40f to your computer and use it in GitHub Desktop.
Save enforser/80f8b3bce641078255fd27ff895fc40f to your computer and use it in GitHub Desktop.
;; This document is meant to demonstrate a common misunderstanding in how
;; clojure.async's go blocks handle parallel processing.
(require '[clojure.core.async :as a])
;; non-blocking core async example.
(time (let [n 1000
vals-coll (atom [])]
(let [pipeline-chan (a/chan 1)]
(dotimes [i n] (a/go (a/>! pipeline-chan i)))
(dotimes [_ n] (let [v (a/<!! pipeline-chan)]
(swap! vals-coll conj v)))
;; make sure all values made it through the pipe
(assert (= (set (range n)) (set @vals-coll))))))
;; approximately 36ms
(time (let [n 1000]
(let [pipeline-queue (atom (clojure.lang.PersistentQueue/EMPTY))
vals-coll (atom [])]
(dotimes [i n] (future (swap! pipeline-queue conj i)))
(dotimes [_ n]
(swap! vals-coll conj (peek @pipeline-queue))
(swap! pipeline-queue pop))
;; make sure all values made it through the pipe
(assert (= (set (range n)) (set @vals-coll))))))
;; approximately 8ms
;; it is around 4-5 times faster, but both are very small times.
;;; The next section uses the same code blocks as above, but with the addition
;;; of blocking in the the thread/go blocks.
;; Added the blocking addition to async parallelization, and we can see that the
;; time to complete each action is increased every time n increases by 8.
;; This is because all 8 core async threads are busy doing the blocking,
;; so they don't move onto the next batch until they are complete, at which
;; point they block again.
;; i.e.
;; n < 9 => ~2 seconds
;; 9 <= n < 17 => ~4 seconds
;; 17 <= n < 25 => ~6 seconds
;; 25 <= n < 33 => ~8 seconds
;; ...
(time (let [n 17
vals-coll (atom [])]
(let [pipeline-chan (a/chan 1)]
(dotimes [i n] (a/go (Thread/sleep 2000) ;; add blocking
(a/>! pipeline-chan i)))
(dotimes [_ n] (let [v (a/<!! pipeline-chan)]
(swap! vals-coll conj v)))
;; make sure all values made it through the pipe
(assert (= (set (range n)) (set @vals-coll))))))
;; We now can make n equal to approximately the maximum number of future threads we can spawn
;; and the time to collect all events from the queue remains equal to ~2 seconds.
(time (let [n 1000]
(let [pipeline-queue (atom (clojure.lang.PersistentQueue/EMPTY))
vals-coll (atom [])]
(dotimes [i n] (future (Thread/sleep 2000) ;; add blocking
(swap! pipeline-queue conj i)))
;; in this case the takes don't block so we need to wait for elements to be added
;; to the queue
(Thread/sleep 2000)
(dotimes [_ n]
(swap! vals-coll conj (peek @pipeline-queue))
(swap! pipeline-queue pop))
;; make sure all values made it through the pipe
(assert (= (set (range n)) (set @vals-coll))))))
;; This demonstrates that we should greatly favour using threading (future, clojure.core.async/thread, etc)
;; over relying on go blocks to provide effective parallel processing. This is especially true when
;; the thread/go block is performing a blocking task (sleep, file I/O, etc).
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment