Skip to content

Instantly share code, notes, and snippets.

@karlmikko
Last active September 1, 2017 03:32
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save karlmikko/6cf998ac10a533c480acd460505f3f06 to your computer and use it in GitHub Desktop.
Save karlmikko/6cf998ac10a533c480acd460505f3f06 to your computer and use it in GitHub Desktop.
yet another pmap
;; You can use this as a drop in replacement for map
;; like pmap but with a bounded number of threads that is shared with child fmap invocations
;; making this safe to use in recusive algorithms like tree walking. Dead locks are avaoided
;; by continuing processing on the current thread if the pool is full.
;;
;; You can also specify the window size amouth that fmap looks ahead
;; this make fmap semi-lazy like pmap.
;;
;; You don't need to set options - it will default to the same thread count as pmap (2 + N-CPUs).
;;
;; Implemented using futures and an atom to count the number of futures created.
;; Due to the nature of atoms in clojure, this means the actual number of futures that can be created
;; may be larger than the pool-size provided. I don't expect this to be too large though.
;;
(ns future-map)
(def ^{:dynamic true} *future-counter* nil)
(def ^{:dynamic true} *pool-size* nil)
(def ^{:dynamic true} *window-size* nil)
(defn processors
[]
(.. Runtime getRuntime availableProcessors))
(defmacro start-options
[{:keys [pool-size window-size size new-counter]} & body]
`(let [psize# (or ~pool-size ~size *pool-size* (+ 2 (processors)))
wsize# (or ~window-size ~size *window-size* psize#)
future-counter# (or (when ~new-counter (atom 0)) *future-counter* (atom 0))]
(with-bindings {#'*pool-size* psize#
#'*window-size* (min psize# wsize#)
#'*future-counter* future-counter#}
~@body)))
(defn- future-deref
[x]
(if (future? x)
(deref x)
x))
(defn- window-ahead
([n coll]
(let [step (fn step [[x & xs :as vs] fs]
(lazy-seq
(if-let [s (seq fs)]
(cons (future-deref x) (step xs (rest s)))
(map future-deref vs))))]
(step coll (drop n coll)))))
(defn fmap
[f coll]
(start-options
{}
(let [future-counter *future-counter*
pool-size *pool-size*
window-size *window-size*]
(->> coll
(map (fn internal-mapper
[x]
(if (< @future-counter pool-size)
(do
(swap! future-counter inc)
(future (let [r (f x)]
(swap! future-counter dec)
r)))
(f x))))
(window-ahead window-size)))))
@karlmikko
Copy link
Author

karlmikko commented Sep 1, 2017

Example usage:

(->> [1 2 3] 
        (fmap (fn [x] 
                        (Thread/sleep 1000) 
                        x)) 
        doall 
        time)
;; => "Elapsed time: 1000.767487 msecs"
;; => (1 2 3)

(start-options {:size 1}
                        (->> [1 2 3] 
                                 (fmap (fn [x] 
                                                 (Thread/sleep 1000) 
                                                 x)) 
                                  doall 
                                  time))
;; => "Elapsed time: 2001.610422 msecs"
;; => (1 2 3)

The last example may seem odd as one might expect the elapsed time to be 3 seconds. When in fact, 1 future is created and work continues on in the current thread. So you have a concurrency level of 2.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment