Skip to content

Instantly share code, notes, and snippets.

@reborg
Last active November 28, 2017 03:03
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save reborg/3056a50af4f977b280b9b6f526670add to your computer and use it in GitHub Desktop.
Save reborg/3056a50af4f977b280b9b6f526670add to your computer and use it in GitHub Desktop.
Clojure parallel-lazy merge-sort
;; PARALLEL LAZY MERGE-SORT
;; Allows large datasets (that would otherwise not fit into memory)
;; to be sorted in parallel on a single machine.
;; Data to fetch is identified by a range of IDs. IDs are split into
;; chunks that are sent in parallel to a fork-join thread pool
;; (using reducers). A protocol allows to define a policy to fetch
;; the data for the current ID range. The chunk is sorted and saved
;; to disk. A file handle is returned from each thread pointing at
;; a temp file containing the sorted chunk. The list of file handles
;; is finally used to merge pre-sorted chunks lazily while
;; maintaining order. The provided fetch-ids is just for test and
;; generates shuffled integer ranges.
(require '[clojure.java.io :as io])
(require '[clojure.core.reducers :as r])
(defn- save-chunk! [data]
(let [file (java.io.File/createTempFile "mergesort-" ".tmp")]
(with-open [fw (io/writer file)]
(binding [*out* fw]
(pr data)
file))))
(defprotocol DataProvider
(fetch-ids [id-range]))
(defn- process-leaf [id-range sortf]
(-> (fetch-ids id-range)
sortf
save-chunk!
vector))
(defrecord IdRange [from to]
r/CollFold
(coll-fold [{:keys [from to] :as id-range} n mergef sortf]
(if (<= (- to from) n)
(process-leaf id-range sortf)
(let [half (+ from (quot (- to from) 2))
r1 (IdRange. from half)
r2 (IdRange. half to)
fc (fn [id-range] #(r/fold n mergef sortf id-range))]
(#'r/fjinvoke
#(let [f1 (fc r1)
t2 (#'r/fjtask (fc r2))]
(#'r/fjfork t2)
(mergef (f1) (#'r/fjjoin t2))))))))
(extend-type IdRange
DataProvider
(fetch-ids [id-range]
(shuffle (range (:from id-range) (:to id-range)))))
(defn sort-all
"Lazily merge already sorted collections. Maintains order
through given comparator (or compare by default)."
([colls]
(sort-all compare colls))
([cmp colls]
(lazy-seq
(if (some identity (map first colls))
(let [[[winner & losers] & others] (sort-by first cmp colls)]
(cons winner (sort-all cmp (if losers (conj others losers) others))))))))
(defn- load-chunk [fname]
(read-string (slurp fname)))
(defn psort
([id-range]
(psort compare id-range))
([cmp id-range]
(->> (r/fold 512 concat (partial sort cmp) id-range)
(map load-chunk)
(sort-all cmp))))
;; Example usage:
;; (take 10 (psort (IdRange. 0 10000000)))
;; 13 secs later on my machine:
;; (0 1 2 3 4 5 6 7 8 9)
;; (take 10 (psort > (IdRange. 0 1000)))
;; (999 998 997 996 995 994 993 992 991 990)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment