Skip to content

Instantly share code, notes, and snippets.

@optevo
Created December 31, 2014 08:44
Show Gist options
  • Save optevo/aa9e64b5bd732544d05d to your computer and use it in GitHub Desktop.
Save optevo/aa9e64b5bd732544d05d to your computer and use it in GitHub Desktop.
Select an execution mode for a task with one parameter (immediate, cached, delayed, agent send, agent send-off) and add priority queues.
(ns easy.scheduler
(:require [clojure.data.priority-map :refer :all]))
(def ^:dynamic *task* identity)
(def result (atom (transient {})))
(def todo (atom (priority-map)))
(deftype Now [o]
clojure.lang.IDeref
(deref [this] o)
clojure.lang.IPending
(isRealized [this] true))
(defn now [o] (Now. o))
(defn next-todo! []
(loop [t0 @todo]
(if-not (empty? t0)
(let [k (first (first t0)) t1 (dissoc t0 k) pass? (compare-and-set! todo t0 t1)]
(if pass? k (recur @todo))))))
(defn process! [p]
(let [k (next-todo!)]
(if k (deliver p (*task* k))) k))
(defn agent-task! [new-key send-fn priority]
(let [p (promise) a (agent p)]
(swap! result assoc! new-key p)
(swap! todo assoc new-key (if priority priority 5))
(send-fn a process!)))
(defn add-task! [new-key mode & [option]]
(cond
(= :now mode) (*task* new-key)
(@result new-key) (throw (IllegalArgumentException. (str "Key already added: " new-key)))
(= :send mode) (agent-task! new-key send option)
(= :send-off mode) (agent-task! new-key send-off option)
(= :cache mode) (swap! result assoc! new-key (now (*task* new-key)))
(= :delay mode) (swap! result assoc! new-key (delay (*task* new-key)))
:else (throw (IllegalArgumentException. (str mode)))))
(defn add-data [k v]
(swap! result assoc! k (now v)))
(defn prioritize! [k f]
(loop [t0 @todo]
(if (contains? t0 k)
(let [p0 (t0 k) p1 (f p0) t1 (assoc t0 k p1) pass? (compare-and-set! todo t0 t1)]
(if-not pass? (recur @todo))))))
(defn get-result [priority-fn k]
(let [r @result p (r k)]
(if (nil? p) (*task* k)
(if (or (realized? p) (delay? p)) @p
(do (prioritize! k priority-fn) @p)))))
(defn get-eventually [k]
(let [r @result p (r k)]
(if (nil? p) (*task* k) @p)))
(def get-now (partial get-result dec))
;------------- Tests
(def workers (atom []))
(defn restart! []
(reset! todo (priority-map))
(apply await @workers)
(reset! result (transient {}))
(reset! workers []))
(defn calculation [x]
(loop [n x f 1]
(if (= n 1) f (recur (dec n) (* f n)))))
(defn simulate-io [_]
(Thread/sleep 3))
(defn agent? [o] (instance? clojure.lang.Agent o))
(defn exec-test [func mode]
(binding [*task* func]
(let [t0 (System/nanoTime)
r (range 1N 1000N)
k1 500N
k2 600N
_ (doseq [n r] (let [w (add-task! n mode (if (= n k2) 0))]
(if (agent? w) (swap! workers conj w))))
t1 (System/nanoTime)
_ (get-now k1)
t2 (System/nanoTime)
_ (get-now k2)
t3 (System/nanoTime)
_ (apply await @workers)
t4 (System/nanoTime)
_ (get-now k1)
t5 (System/nanoTime)
_ (get-now k2)
t6 (System/nanoTime)
_ (doseq [n r] (get-now n))
t7 (System/nanoTime)
_ (Thread/sleep 10)
_ (restart!)
t (fn [s f] (format "%10.2f" (/ (- f s) 100000.0)))]
(str (t t0 t1) " " (t t1 t2) " " (t t2 t3) " " (t t4 t5) " " (t t0 t4) " " (t t5 t6) " " (t t6 t7)))))
(exec-test calculation :now )
(exec-test calculation :cache )
(exec-test calculation :delay )
(exec-test calculation :send )
(exec-test simulate-io :now )
(exec-test simulate-io :cache )
(exec-test simulate-io :delay )
(exec-test simulate-io :send )
(exec-test simulate-io :send-off )
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment