Skip to content

Instantly share code, notes, and snippets.

@reborg
Last active May 5, 2019 13:34
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save reborg/6cef0d83a5035363bd242510d50dfd2a to your computer and use it in GitHub Desktop.
Save reborg/6cef0d83a5035363bd242510d50dfd2a to your computer and use it in GitHub Desktop.
Parallel stateful transducers with fold
; The following is an attempt to answer the question: how to use stateful transducers with fork-join r/fold and what you
; should expect. Feedback appreciated. Feel free to discuss on Clojurians slack @reborg.
; Stateful transducers (for example drop, partition-by, dedupe etc.) cannot be used in parallel with fold.
; They produce inconsistent results, for example:
(require '[clojure.core.reducers :as r])
(distinct
(for [i (range 500)]
(r/fold
((drop 10) +)
(vec (range 1600)))))
;; (1279155 1271928 1275949 1271127 1277573)
; It's a known problem. Stateful transducers don't play well with concurrency. Even assuming concurrency was not the
; problem, there is also a semantic question. The definition of "drop" n-elements from the head of a collection,
; cannot be translated *as is* to a parallel context.
; The idea that follows is to give a definition of stateful transducer in a parallel context and fix the concurrency issue.
; The resulting parallel version is consistent, but the meaning is radically different from the sequential version. drop in
; the standard library for example has a specific semantic: it jumps n-elements from the head (not the middle, not the tail)
; of the collection. We could anyway have the desire to define a fork-join-aware behavior of drop which is different: every
; chunk gets the first n element dropped. This would be equivalent to the following sequential code (using 200 chunk size):
(->> (vec (range 1600))
(partition 200)
(mapcat #(drop 10 %))
(map inc)
(reduce +))
;; 1224360
; Here the dropped elements from each partition are: (range 10) (range 200 210) (range 400 410) and so on. Our goal is to
; provide a similar behavior in parallel using drop (or other stateful/stateless transducers). If the transducer is
; stateful, our expectation is that is going to affect each chunk equally.
; The reason why stateful transducers are inconsistent when run in parallel, is how the volatile/atom closes over the
; reducing function. Each fjtask gets created around a call to reduce, which in turn uses an enhanced reducing function
; (each transducer is defining its own transforming reducing function). But each reduce call inside the fjtask sees the same
; volatile/atom/state. Depending on which thread reads the volatile and when, the drop transducer example above drops less
; or more items per chunk. The solution is to defer transformations (and thus state initialization) at reduce-time. We need
; a few ingredients for this:
; * At composition time, transformations of the reducing function need to be deferred.
; * We need to intercept coll-fold so the deferred transformations can be correctly handled.
; It's much easier to see than to explain:
(defn compose [fns] (reduce #(%2 %1) fns))
(defn defer [xf rf] (if (vector? rf) (conj rf xf) (conj [] rf xf)))
(defn foldvec*
[v n combinef reducef]
(cond
(empty? v) (combinef)
(<= (count v) n) (r/reduce (compose reducef) (combinef) v)
:else
(let [split (quot (count v) 2)
v1 (subvec v 0 split)
v2 (subvec v split (count v))
fc (fn [child] #(foldvec* child n combinef reducef))]
(#'r/fjinvoke
#(let [f1 (fc v1)
t2 (#'r/fjtask (fc v2))]
(#'r/fjfork t2)
(combinef (f1) (#'r/fjjoin t2)))))))
(extend-protocol r/CollFold
Object
(coll-fold
[coll n combinef reducef]
(reduce (compose reducef) (combinef) coll))
clojure.lang.IPersistentVector
(coll-fold
[v n combinef reducef]
(foldvec* v n combinef reducef)))
;; parallel. Drop 10 each chunk. Increment the rest.
(r/fold 200 + (defer (comp (drop 10) (map inc)) +) (vec (range 1600)))
;; 1224360
;; consistent
(distinct
(for [i (range 500)]
(r/fold 200 + (defer (comp (drop 10) (map inc)) +) (vec (range 1600)))))
;; Warning: changing the chunk size changes the result.
(r/fold + (defer (comp (drop 10) (map inc)) +) (vec (range 1600)))
;; 1256580
;; Also sequential with fold just in case.
(r/fold 200 + (defer (comp (drop 10) (map inc)) +) (range 1600))
;; 1280745
;; gran finale
(defn xform [rf]
(defer
(comp (map inc)
(filter even?)
(dedupe)
(mapcat range)
(partition-all 3)
(partition-by #(< (apply + %) 7))
(mapcat flatten)
(random-sample 1.0)
(take-nth 1)
(keep #(when (odd? %) (* % %)))
(keep-indexed #(when (even? %1) (* %1 %2)))
(replace {2 "two" 6 "six" 18 "eighteen"})
(take 11)
(take-while #(not= 300 %))
(drop 1)
(drop-while string?)
(remove string?)) rf))
(r/fold + (xform +) (vec (interleave (range 180000) (range 200000))))
;; 105440856
; Summary: if you need to use a transducer chain in parallel, feel free to use fold. If the chain contains stateful
; transducers results will be inconsistent. If you can translate the transducer chain semantic to apply to each chunk, use
; the deferred/compose approach above.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment