Skip to content

Instantly share code, notes, and snippets.

@cgrand
Last active February 2, 2018 15:56
Show Gist options
  • Star 6 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save cgrand/c8fa256c9ed8331f9cf1 to your computer and use it in GitHub Desktop.
Save cgrand/c8fa256c9ed8331f9cf1 to your computer and use it in GitHub Desktop.
Chunked transducer
; there are some obvious micro optimizations, I left them out for clarity (see the other file for an optimized version)
; the relative ordering of read and writes with volatile and plain array should be thread-safe (if not, point it out)
; @wagjo asked "Have you found use for such concept? Must be pretty slow compared to unchunked one"
; The idea came out of a discussion on transducers so not used for real, yet.
; Once you optimize it (remove the boxing induced by the volatile, switch to unchecked math) there should not be much
; of an overhead.
; When you have one big composed reducing fn (eg several mapping stages) then each item is going to be processed in
; each stage, each stage of the mapping may evict from the data cache stuff used by previous stages. So you have cache
; misses for each item.
; By chunking the processing, a batch of items is going to be processed by a single stage before being passed to
; the next stage, so it reduces cross-stage cache trashing.
; Furthermore smaller "loop" bodies may be more JIT-friendly.
; Theoritically instruction cache trashing may also be an issue with big fns. In practice it's rare to have I$ trashing
; whithout D$ trashing.
(defn chunked
([] (chunked 32))
([n]
(fn [f1]
(let [xs (object-array n)
nv (volatile! 0)
flush (fn [acc]
(loop [i 0 acc acc]
(if (< i @nv) ; read of the volatile BEFORE read of the array
(let [acc (f1 acc (aget xs i))]
(if (reduced? acc)
acc
(recur (inc i) acc)))
acc)))]
(fn
([] (f1))
([acc] (f1 (flush acc)))
([acc x]
(if (< @nv n)
(do
(aset xs @nv x)
(vswap! nv inc) ; write of the volatile AFTER write of the array
acc)
(let [acc (flush acc)]
(aset xs 0 x)
(vreset! nv 1)
acc))))))))
(ns chunked-transducers.core)
(deftype Chunked [^:volatile-mutable ^int nv ^objects xs f1]
clojure.lang.IFn
(invoke [_] (f1))
(invoke [_ acc] (loop [i (int 0) acc acc]
(if (< i nv) ; read of the volatile BEFORE read of the array
(let [acc (f1 acc (aget xs i))]
(if (reduced? acc)
acc
(recur (unchecked-inc-int i) acc)))
(f1 acc))))
(invoke [_ acc x]
(if (< nv (alength xs))
(do
(aset xs nv x)
(set! nv (unchecked-inc-int nv)) ; write of the volatile AFTER write of the array
acc)
(let [acc (loop [i (int 0) acc acc]
(if (< i nv) ; read of the volatile BEFORE read of the array
(let [acc (f1 acc (aget xs i))]
(if (reduced? acc)
acc
(recur (unchecked-inc-int i) acc)))
acc))]
(aset xs 0 x)
(set! nv (int 1))
acc))))
(defn chunked
([] (chunked 32))
([n]
(fn [f1] (Chunked. 0 (object-array n) f1))))
(def t1 (zipmap (range 1e6) (repeatedly #(rand-int 1e6))))
(def t2 (zipmap (range 1e6) (repeatedly #(rand-int 1e6))))
(def input (vec (take 1e6 (repeatedly #(rand-int 1e3)))))
;=> (crit/quick-bench
; (transduce (comp (map t1) (map t2)) +
; input))
;WARNING: Final GC required 36.63001721835632 % of runtime
;Evaluation count : 6 in 6 samples of 1 calls.
; Execution time mean : 373,552665 ms
; Execution time std-deviation : 9,738724 ms
; Execution time lower quantile : 364,651998 ms ( 2,5%)
; Execution time upper quantile : 384,795998 ms (97,5%)
; Overhead used : 2,180775 ns
;nil
;=> (crit/quick-bench
; (transduce (comp (map t1) (chunked 2048) (map t2)) +
; input))
;WARNING: Final GC required 42.808313289048996 % of runtime
;Evaluation count : 6 in 6 samples of 1 calls.
; Execution time mean : 316,616331 ms
; Execution time std-deviation : 4,052901 ms
; Execution time lower quantile : 310,106998 ms ( 2,5%)
; Execution time upper quantile : 320,407998 ms (97,5%)
; Overhead used : 2,180775 ns
;nil
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment