Last active
December 6, 2018 19:14
-
-
Save enforser/80f8b3bce641078255fd27ff895fc40f to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
;; This document is meant to demonstrate a common misunderstanding in how | |
;; clojure.async's go blocks handle parallel processing. | |
(require '[clojure.core.async :as a]) | |
;; non-blocking core async example. | |
(time (let [n 1000 | |
vals-coll (atom [])] | |
(let [pipeline-chan (a/chan 1)] | |
(dotimes [i n] (a/go (a/>! pipeline-chan i))) | |
(dotimes [_ n] (let [v (a/<!! pipeline-chan)] | |
(swap! vals-coll conj v))) | |
;; make sure all values made it through the pipe | |
(assert (= (set (range n)) (set @vals-coll)))))) | |
;; approximately 36ms | |
(time (let [n 1000] | |
(let [pipeline-queue (atom (clojure.lang.PersistentQueue/EMPTY)) | |
vals-coll (atom [])] | |
(dotimes [i n] (future (swap! pipeline-queue conj i))) | |
(dotimes [_ n] | |
(swap! vals-coll conj (peek @pipeline-queue)) | |
(swap! pipeline-queue pop)) | |
;; make sure all values made it through the pipe | |
(assert (= (set (range n)) (set @vals-coll)))))) | |
;; approximately 8ms | |
;; it is around 4-5 times faster, but both are very small times. | |
;;; The next section uses the same code blocks as above, but with the addition | |
;;; of blocking in the the thread/go blocks. | |
;; Added the blocking addition to async parallelization, and we can see that the | |
;; time to complete each action is increased every time n increases by 8. | |
;; This is because all 8 core async threads are busy doing the blocking, | |
;; so they don't move onto the next batch until they are complete, at which | |
;; point they block again. | |
;; i.e. | |
;; n < 9 => ~2 seconds | |
;; 9 <= n < 17 => ~4 seconds | |
;; 17 <= n < 25 => ~6 seconds | |
;; 25 <= n < 33 => ~8 seconds | |
;; ... | |
(time (let [n 17 | |
vals-coll (atom [])] | |
(let [pipeline-chan (a/chan 1)] | |
(dotimes [i n] (a/go (Thread/sleep 2000) ;; add blocking | |
(a/>! pipeline-chan i))) | |
(dotimes [_ n] (let [v (a/<!! pipeline-chan)] | |
(swap! vals-coll conj v))) | |
;; make sure all values made it through the pipe | |
(assert (= (set (range n)) (set @vals-coll)))))) | |
;; We now can make n equal to approximately the maximum number of future threads we can spawn | |
;; and the time to collect all events from the queue remains equal to ~2 seconds. | |
(time (let [n 1000] | |
(let [pipeline-queue (atom (clojure.lang.PersistentQueue/EMPTY)) | |
vals-coll (atom [])] | |
(dotimes [i n] (future (Thread/sleep 2000) ;; add blocking | |
(swap! pipeline-queue conj i))) | |
;; in this case the takes don't block so we need to wait for elements to be added | |
;; to the queue | |
(Thread/sleep 2000) | |
(dotimes [_ n] | |
(swap! vals-coll conj (peek @pipeline-queue)) | |
(swap! pipeline-queue pop)) | |
;; make sure all values made it through the pipe | |
(assert (= (set (range n)) (set @vals-coll)))))) | |
;; This demonstrates that we should greatly favour using threading (future, clojure.core.async/thread, etc) | |
;; over relying on go blocks to provide effective parallel processing. This is especially true when | |
;; the thread/go block is performing a blocking task (sleep, file I/O, etc). |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment