Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Mulit-threaded Spark jobs via Clojure with Claypoole (custom pmap) + mapPartitions glue code
(require '[claypoole.core :as tpool])
;;
;; helper fns
;;
(defn your-multi-threaded-spark-fn
"Create a parallelized version of your-fn using a fixed-size threadpool.
This fn is created to fit the signature of .mapPartitions in Spark."
[extra-args-to-set-up-your-fn]
(fn
[elem-iter]
;; The "hard part" of this fn is creating an Iterator + Iterable out of a
;; Clojure seq. The thread pool management and custom parallel map (pmap) call
;; are made easy by the custom thread pool library.
(let [elem-seq (or (seq (iterator-seq elem-iter)) [])]
(if (empty? elem-seq)
;; If we are given an empty Spark partition, then return an empty Iterator/-able
(let [iter (java.util.Collections/emptyIterator)]
(proxy [ java.lang.Iterable ]
[]
(hasNext [] (.hasNext iter))
(next [] (.next iter))
(remove [] (.remove iter))
(iterator [] iter)))
;; ...else, we now know we have a non-empty Spark partition. Implementing
;; the Iterator interface means creating code like in a 'loop' form when
;; iterating over a seq.
(let [out-iter-rest-cache (atom (tpool/pmap num-threads (partial your-fn any-args-go-here) elem-seq))
out-iter (proxy [ java.util.Iterator
java.lang.Iterable ]
[]
(remove [] (throw Exception "You don't want to call 'remove' on an iterator from a Spark mapPartitions call!"))
(hasNext [] (boolean (seq @out-iter-rest-cache)))
(next []
(let [out-iter-next (first @out-iter-rest-cache)]
(do
(swap! out-iter-rest-cache rest))
out-iter-next))
(iterator [] this))]
out-iter)))))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment