public
Last active

  • Download Gist
forkjoin.clj
Clojure
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69
(ns fj
(:import [java.util.concurrent RecursiveTask
ForkJoinPool]))
 
(set! *warn-on-reflection* true)
 
;; -----------------------------------------------
;; Helpers to provide an idiomatic interface to FJ
 
(defprotocol IFJTask
(fork [this])
(join [this])
(run [this])
(compute [this]))
 
(deftype FJTask [^RecursiveTask task]
IFJTask
(fork [_] (FJTask. (.fork task)))
(join [_] (.join task))
(run [_] (.invoke task))
(compute [_] (.compute task)))
 
(defn ^FJTask task* [f]
(FJTask. (proxy [RecursiveTask] []
(compute [] (f)))))
 
(defmacro task [& rest]
`(task* (fn [] ~@rest)))
 
(defprotocol IFJPool
(shutdown [this])
(submit [this task])
(invoke [this task])
(execute [this task]))
 
(deftype FJPool [^ForkJoinPool fjp]
IFJPool
(shutdown [this] (.shutdown this))
(submit [this task]
(let [^FJTask task task]
(.submit fjp
^RecursiveTask (.task task))))
(invoke [this task]
(let [^FJTask task task]
(.invoke fjp
^RecursiveTask (.task task))))
(execute [this task]
(let [^FJTask task task]
(.execute fjp
^RecursiveTask (.task task)))))
 
(defn ^FJPool fjpool []
(FJPool. (ForkJoinPool.)))
 
;; -----------------------------------------------
;; Fib
 
(def pool (fjpool))
 
(defn fib [n]
(if (<= n 1)
n
(let [f1 (fork (task (fib (dec n))))]
(+ (run (task (fib (- n 2))))
(join f1)))))
 
(comment
(invoke pool (task (fib 10)))
)

Nice. Out of interest, why use proxy in task*. Is reify not an option here?

RecursiveTask must be subclassed, so I don't think reify can be used in this case.

Please sign in to comment on this gist.

Something went wrong with that request. Please try again.