Skip to content

Instantly share code, notes, and snippets.

@zcaudate
Created September 8, 2014 05:01
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save zcaudate/a7eb0a4faf3138365607 to your computer and use it in GitHub Desktop.
Save zcaudate/a7eb0a4faf3138365607 to your computer and use it in GitHub Desktop.
(ns hara.concurrent.workflow
(:require [hara.common.checks :refer [hash-map? promise?]]
[hara.common.primitives :refer [uuid]]
[hara.data.nested :refer [merge-nil-nested]]
[clojure.set :as set]))
(defn create-registry []
(atom {:tickets {}
:tasks {}
:active #{}
:running {}
:upstream {}
:downstream {}}))
(def ^:dynamic *registry* (create-registry))
(defn task? [p]
(and (promise? p)
(-> p meta :type (= :task))))
(defn get-meta [p k]
(-> p meta (get k)))
(defn task-dependencies [args reg]
(->> args
(filter #(and (task? %)
(not (realized? %))
(get (:tasks reg) (get-meta % :id))))
(map #(get-meta % :id))
set))
(defn link-upstream [reg id upstream]
(reduce (fn [reg upid]
(update-in reg [:downstream upid] conj id))
reg
upstream))
(defn unlink-upstream [reg id upstream]
(reduce (fn [reg upid]
(update-in reg [:downstream upid] disj id))
reg
upstream))
(defn unlink-downstream [reg id downstream]
(reduce (fn [reg dnid]
(update-in reg [:upstream dnid] disj id))
reg
downstream))
(defn register-task [registry id entry args]
(swap! registry
(fn [reg]
(let [upstream (task-dependencies args reg)]
(-> reg
(link-upstream id upstream)
(assoc-in [:tasks id] entry)
(assoc-in [:upstream id] upstream)
(assoc-in [:downstream id] #{}))))))
(defn task [opts? f & args]
(let [[opts f args] (if (hash-map? opts?)
[opts? f args]
[{} opts? (cons f args)])
id (or (:id opts) (uuid))
opts (-> (merge-nil-nested opts {:registry *registry*})
(assoc :id id :type :task))
p (vary-meta (promise) merge opts)
entry {:id id :promise p :function f :args args}]
(register-task (-> opts :registry) id entry args)
p))
(defn either [opts? task & more]
(let [[opts tasks] (if (hash-map? opts?)
[opts? (cons task more)]
[{} (cons opts? (cons task more))])
id (or (:id opts) (uuid))
opts (-> (merge-nil-nested opts {:registry *registry*})
(assoc :id id :type :either))
p (vary-meta (promise) merge opts)
entry {:id id :promise p :choices tasks}]
(register-task (-> opts :registry) id entry tasks)
p))
(defn take-ticket [tickets id]
(if (get tickets id)
(throw (Exception. (str "Task " id " already has a ticket")))
(assoc tickets id (promise))))
(defn return-ticket [tickets id p registry]
(swap! registry
(fn [reg]
(let [_ @(get tickets id)]
(-> reg
(unlink-downstream id (get-in reg [:downstream id]))
(update-in [:tickets] dissoc id)
(update-in [:tasks] dissoc id)
(update-in [:running] dissoc id)
(update-in [:active] disj id))))))
(defn perform-apply [p function args opts]
(let [args (if (-> opts :apply-to (= :promise))
args
(map (fn [x] (if (task? x) @x x)) args))]
(apply function args)))
(declare trigger-downstream)
(defn perform-task [p registry id reg opts]
(if-let [entry (get-in reg [:tasks id])]
(let [tickets (take-ticket (:tickets reg) id)
thrd (-> (future
(try
(deliver p (perform-apply p (:function entry) (:args entry) (meta p)))
(finally (return-ticket tickets id p registry)
(trigger-downstream registry id))))
(vary-meta merge opts))
_ (deliver (get tickets id) thrd)]
(-> reg
(assoc :tickets tickets)
(assoc-in [:running id] thrd)
(assoc-in [:tasks id :running] true)))
reg))
(defn all-upstream
([p] (let [opts (meta p)]
(all-upstream (:id opts) @(:registry opts))))
([id reg] (all-upstream id reg #{}))
([id reg output]
(let [upstream (get-in reg [:upstream id])]
(apply set/union
output
upstream
(map #(all-upstream % reg) upstream)))))
(defn all-downstream
([p] (let [opts (meta p)]
(all-downstream (:id opts) @(:registry opts))))
([id reg] (all-downstream id reg #{}))
([id reg output]
(let [downstream (get-in reg [:downstream id])]
(apply set/union
output
downstream
(map #(all-downstream % reg) downstream)))))
(defn activate-task
([p]
(let [opts (meta p)]
(activate-task (:id opts) (:registry opts))))
([id registry]
(swap! registry
(fn [reg]
(let [ids (conj (all-upstream id reg) id)]
(update-in reg [:active] set/union ids))))))
(defn perform
([p]
(if (not (realized? p))
(let [{:keys [id registry]} (meta p)]
(activate-task p)
(doseq [id (:active @registry)]
(perform registry id))))
p)
([registry id] (perform registry id (get-in @registry [:tasks id :promise])))
([registry id p]
(swap! registry
(fn [reg]
(if (get-in reg [:running id])
reg
(let [upstream (get-in reg [:upstream id])
reg (update-in reg [:upstream] dissoc id)]
(cond (and (empty? upstream)
(get-in reg [:active id]))
(perform-task p registry id reg (meta p))
:else reg)))))))
(defn trigger-downstream-cleanup [reg id]
(let [downstream (get-in reg [:downstream id])
reg (update-in reg [:downstream] dissoc id)]
(-> (reduce (fn [reg dnid]
(let [dnp (get-in reg [:tasks dnid :promise])]
(condp = (-> dnp meta :type)
:task reg
:either (let [upp (get-in reg [:tasks id :promise])
upstream (get-in reg [:upstream dnid])
dndownstream (get-in reg [:downstream dnid])]
(println "promise" id upp)
(deliver dnp @upp)
(-> reg
(unlink-upstream dnid upstream)
(trigger-downstream-cleanup reg dnid dndownstream))))))
reg
downstream)
(update-in [:triggers] set/union downstream))))
(defn trigger-downstream [registry id]
(swap! registry
(fn [reg]
(trigger-downstream-cleanup reg id)))
(swap! registry
(fn [reg]
(doseq [id (:triggers reg)]
(perform registry id))
(dissoc reg :triggers))))
(defn upstream
([p]
(let [opts (meta p)]
(upstream (:id opts) @(:registry opts))))
([id reg]
(get-in reg [:upstream id])))
(defn downstream
([p]
(let [opts (meta p)]
(downstream (:id opts) @(:registry opts))))
([id reg]
(get-in reg [:downstream id])))
(comment)
(do
(def a (task {:id :a}
#(do (Thread/sleep 100)
(inc %)) 1))
(def b (task {:id :b}
#(do (Thread/sleep 100)
(inc %)) a))
(def c (task {:id :c}
#(do (Thread/sleep 100)
(inc %)) b))
(def d (either {:id :d} b c))
*registry*
;;(meta b)
(perform d)
;;(activat)
;;(perform a)
;;(perform d)
;;(perform b)
;;b
;;c
;;
;;(:active @*registry*)
;; (all-upstream c)
;;@a
;;(perform a)
;;(perform b)
;;(upstream b)
;; (downstream b)
;;(>pst)
;;(:upstream @*registry*)
;; (:downstream @*registry*)
;;
;; (task? (task inc 1))
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment