Last active
May 5, 2019 13:34
-
-
Save reborg/6cef0d83a5035363bd242510d50dfd2a to your computer and use it in GitHub Desktop.
Parallel stateful transducers with fold
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
; The following is an attempt to answer the question: how to use stateful transducers with fork-join r/fold and what you | |
; should expect. Feedback appreciated. Feel free to discuss on Clojurians slack @reborg. | |
; Stateful transducers (for example drop, partition-by, dedupe etc.) cannot be used in parallel with fold. | |
; They produce inconsistent results, for example: | |
(require '[clojure.core.reducers :as r]) | |
(distinct | |
(for [i (range 500)] | |
(r/fold | |
((drop 10) +) | |
(vec (range 1600))))) | |
;; (1279155 1271928 1275949 1271127 1277573) | |
; It's a known problem. Stateful transducers don't play well with concurrency. Even assuming concurrency was not the | |
; problem, there is also a semantic question. The definition of "drop" n-elements from the head of a collection, | |
; cannot be translated *as is* to a parallel context. | |
; The idea that follows is to give a definition of stateful transducer in a parallel context and fix the concurrency issue. | |
; The resulting parallel version is consistent, but the meaning is radically different from the sequential version. drop in | |
; the standard library for example has a specific semantic: it jumps n-elements from the head (not the middle, not the tail) | |
; of the collection. We could anyway have the desire to define a fork-join-aware behavior of drop which is different: every | |
; chunk gets the first n element dropped. This would be equivalent to the following sequential code (using 200 chunk size): | |
(->> (vec (range 1600)) | |
(partition 200) | |
(mapcat #(drop 10 %)) | |
(map inc) | |
(reduce +)) | |
;; 1224360 | |
; Here the dropped elements from each partition are: (range 10) (range 200 210) (range 400 410) and so on. Our goal is to | |
; provide a similar behavior in parallel using drop (or other stateful/stateless transducers). If the transducer is | |
; stateful, our expectation is that is going to affect each chunk equally. | |
; The reason why stateful transducers are inconsistent when run in parallel, is how the volatile/atom closes over the | |
; reducing function. Each fjtask gets created around a call to reduce, which in turn uses an enhanced reducing function | |
; (each transducer is defining its own transforming reducing function). But each reduce call inside the fjtask sees the same | |
; volatile/atom/state. Depending on which thread reads the volatile and when, the drop transducer example above drops less | |
; or more items per chunk. The solution is to defer transformations (and thus state initialization) at reduce-time. We need | |
; a few ingredients for this: | |
; * At composition time, transformations of the reducing function need to be deferred. | |
; * We need to intercept coll-fold so the deferred transformations can be correctly handled. | |
; It's much easier to see than to explain: | |
(defn compose [fns] (reduce #(%2 %1) fns)) | |
(defn defer [xf rf] (if (vector? rf) (conj rf xf) (conj [] rf xf))) | |
(defn foldvec* | |
[v n combinef reducef] | |
(cond | |
(empty? v) (combinef) | |
(<= (count v) n) (r/reduce (compose reducef) (combinef) v) | |
:else | |
(let [split (quot (count v) 2) | |
v1 (subvec v 0 split) | |
v2 (subvec v split (count v)) | |
fc (fn [child] #(foldvec* child n combinef reducef))] | |
(#'r/fjinvoke | |
#(let [f1 (fc v1) | |
t2 (#'r/fjtask (fc v2))] | |
(#'r/fjfork t2) | |
(combinef (f1) (#'r/fjjoin t2))))))) | |
(extend-protocol r/CollFold | |
Object | |
(coll-fold | |
[coll n combinef reducef] | |
(reduce (compose reducef) (combinef) coll)) | |
clojure.lang.IPersistentVector | |
(coll-fold | |
[v n combinef reducef] | |
(foldvec* v n combinef reducef))) | |
;; parallel. Drop 10 each chunk. Increment the rest. | |
(r/fold 200 + (defer (comp (drop 10) (map inc)) +) (vec (range 1600))) | |
;; 1224360 | |
;; consistent | |
(distinct | |
(for [i (range 500)] | |
(r/fold 200 + (defer (comp (drop 10) (map inc)) +) (vec (range 1600))))) | |
;; Warning: changing the chunk size changes the result. | |
(r/fold + (defer (comp (drop 10) (map inc)) +) (vec (range 1600))) | |
;; 1256580 | |
;; Also sequential with fold just in case. | |
(r/fold 200 + (defer (comp (drop 10) (map inc)) +) (range 1600)) | |
;; 1280745 | |
;; gran finale | |
(defn xform [rf] | |
(defer | |
(comp (map inc) | |
(filter even?) | |
(dedupe) | |
(mapcat range) | |
(partition-all 3) | |
(partition-by #(< (apply + %) 7)) | |
(mapcat flatten) | |
(random-sample 1.0) | |
(take-nth 1) | |
(keep #(when (odd? %) (* % %))) | |
(keep-indexed #(when (even? %1) (* %1 %2))) | |
(replace {2 "two" 6 "six" 18 "eighteen"}) | |
(take 11) | |
(take-while #(not= 300 %)) | |
(drop 1) | |
(drop-while string?) | |
(remove string?)) rf)) | |
(r/fold + (xform +) (vec (interleave (range 180000) (range 200000)))) | |
;; 105440856 | |
; Summary: if you need to use a transducer chain in parallel, feel free to use fold. If the chain contains stateful | |
; transducers results will be inconsistent. If you can translate the transducer chain semantic to apply to each chunk, use | |
; the deferred/compose approach above. |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment