Created
January 2, 2011 00:00
-
-
Save drone29a/762126 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
;;Steps toward pulling otu composiiton strategy. need to do same for input so calcs ca be push through or pill through. | |
(defn async [f task out] (f task out)) | |
(defn sync [f task out] (out (f task))) | |
(defn in-pool | |
[p f] | |
(fn [& args] | |
(.submit p (fn [] (apply f args))))) | |
;;TODO; unable to shutdown pool. seems recursive fns are not responding to interrupt. http://download.oracle.com/javase/tutorial/essential/concurrency/interrupt.html | |
;;TODO: use another thread to check futures and make sure workers don't fail, don't hang, and call for work within their time limit? | |
(defn queue-work | |
"schedule-work one worker function f per thread. | |
f can either be a fn that is directly applied to each task (all workers use the same fn) or | |
f builds and evals a worker from a fn & args passed over the queue (each worker executes an arbitrary fn.) | |
Examples of the latter are clj-worker and json-wroker. | |
Each worker fn polls the work queue via get-work fn, applies a fn to each dequeued item, puts the result with | |
put-done and recursively checks for more work. If it doesn't find new work, it waits until checking for more work. | |
The workers can run in asynchronous mode, where the put-done function is passed to the worker function f, | |
and f is responsible for ensuring that put-done is appropriately called. | |
Valid values for mode are :sync or :async. If a mode is not specified, queue-work defaults to :sync. | |
All error and fault tolernace should be done by client using plumbing.core." | |
[{:keys [f in out threads exec sleep-time]}] | |
(let [threads (or threads (available-processors)) | |
sleep-time (or sleep-time 5000) | |
exec (or exec sync) | |
pool (Executors/newFixedThreadPool threads) | |
out (if (fn? out) | |
out | |
(fn [k & args] | |
(apply (out k) args))) | |
fns (repeat threads | |
(fn [] | |
(if-let [task (in)] | |
(exec f task (if (= exec async) | |
(in-pool pool out) | |
out)) | |
(Thread/sleep sleep-time)) | |
(recur))) | |
futures (doall (map #(.submit pool %) fns))] | |
pool)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment