(ns rebindable-thread-pool
(:import (java.util.concurrent Executors ExecutorService ThreadPoolExecutor ArrayBlockingQueue TimeUnit)))
; fixed thread pool as default
(def ^{:dynamic true}
*thread-pool* (Executors/newFixedThreadPool (+ 2 (.availableProcessors (Runtime/getRuntime)))))
(defn future-call+
"Similar to clojure.core/future-call but with rebindable threadpool."
; the position of the type hint ^Callable ensures that the returned future will return the calculated value
(let [fut (.submit *thread-pool* ^Callable (#'clojure.core/binding-conveyor-fn f))]
(deref [_] (#'clojure.core/deref-future fut))
[_ timeout-ms timeout-val]
(#'clojure.core/deref-future fut timeout-ms timeout-val))
(isRealized [_] (.isDone fut))
(get [_] (.get fut))
(get [_ timeout unit] (.get fut timeout unit))
(isCancelled [_] (.isCancelled fut))
(isDone [_] (.isDone fut))
(cancel [_ interrupt?] (.cancel fut interrupt?)))))
(defmacro future+
"Similar to clojure.core/future but with rebindable threadpool."
[& body] `(future-call+ (^{:once true} fn* [] ~@body)))
(defn shutdown-thread-pool
(.shutdown *thread-pool*))
(defmacro with-bounded-queue-executor
"All future+ calls in the scope of this macro are executed in a bounded queue executor with the given thread count and queue size."
[[thread-count, queue-size], & body]
`(binding [*thread-pool* (ThreadPoolExecutor. ~thread-count, ~thread-count, 0, TimeUnit/MILLISECONDS, (ArrayBlockingQueue. ~queue-size))]
(let [result# ~@body]
