Skip to content

Instantly share code, notes, and snippets.

@drone29a
Created January 2, 2011 00:00
Show Gist options
  • Save drone29a/762126 to your computer and use it in GitHub Desktop.
Save drone29a/762126 to your computer and use it in GitHub Desktop.
;;Steps toward pulling otu composiiton strategy. need to do same for input so calcs ca be push through or pill through.
(defn async [f task out] (f task out))
(defn sync [f task out] (out (f task)))
(defn in-pool
[p f]
(fn [& args]
(.submit p (fn [] (apply f args)))))
;;TODO; unable to shutdown pool. seems recursive fns are not responding to interrupt. http://download.oracle.com/javase/tutorial/essential/concurrency/interrupt.html
;;TODO: use another thread to check futures and make sure workers don't fail, don't hang, and call for work within their time limit?
(defn queue-work
"schedule-work one worker function f per thread.
f can either be a fn that is directly applied to each task (all workers use the same fn) or
f builds and evals a worker from a fn & args passed over the queue (each worker executes an arbitrary fn.)
Examples of the latter are clj-worker and json-wroker.
Each worker fn polls the work queue via get-work fn, applies a fn to each dequeued item, puts the result with
put-done and recursively checks for more work. If it doesn't find new work, it waits until checking for more work.
The workers can run in asynchronous mode, where the put-done function is passed to the worker function f,
and f is responsible for ensuring that put-done is appropriately called.
Valid values for mode are :sync or :async. If a mode is not specified, queue-work defaults to :sync.
All error and fault tolernace should be done by client using plumbing.core."
[{:keys [f in out threads exec sleep-time]}]
(let [threads (or threads (available-processors))
sleep-time (or sleep-time 5000)
exec (or exec sync)
pool (Executors/newFixedThreadPool threads)
out (if (fn? out)
out
(fn [k & args]
(apply (out k) args)))
fns (repeat threads
(fn []
(if-let [task (in)]
(exec f task (if (= exec async)
(in-pool pool out)
out))
(Thread/sleep sleep-time))
(recur)))
futures (doall (map #(.submit pool %) fns))]
pool))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment