Skip to content

Instantly share code, notes, and snippets.

@minikomi
Last active June 12, 2018 06:41
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save minikomi/0d2aa12cea77b3da70877f1f38119e9e to your computer and use it in GitHub Desktop.
Save minikomi/0d2aa12cea77b3da70877f1f38119e9e to your computer and use it in GitHub Desktop.
Async processing for plumatic.plumbing/graph
;; This buffer is for Clojure experiments and evaluation.
;; Press C-j to evaluate the last expression.
(set-env! :dependencies #(into % '[[prismatic/plumbing "0.5.5"]]))
(require '[clojure.core.async :as async :refer [>!! alts! chan close! go-loop timeout]])
(require '[plumbing.core :as p])
(require '[plumbing.graph :as g])
(require '[plumbing.fnk.pfnk :as pfnk])
(def test-graph
(g/graph
:xs (p/fnk [] [1 2 3])
:n (p/fnk [xs] (Thread/sleep 3000) (count xs))
:m (p/fnk [xs n] (Thread/sleep 2000) (/ (p/sum identity xs) n))
:m2 (p/fnk [xs n] (Thread/sleep 2000)(/ (p/sum #(* % %) xs) n))
:v (p/fnk [m m2] (Thread/sleep 3000) (- m2 (* m m)))))
(defn graph->requirements [graph]
(into {}
(map #(vector (first %) (set (pfnk/input-schema-keys (second %))))
graph)))
(graph->requirements test-graph)
(defn get-doable [graph results]
(let [reqs (graph->requirements graph)
result-set (set (keys results))]
(println "results" result-set)
(set
(for [[id req-set] reqs
:when (and (clojure.set/subset? req-set result-set)
(not (result-set id)))]
id))))
(defn start-tasks [graph reporter-ch status]
(let [all-reqs (graph->requirements graph)
{:keys [results todo active]} @status]
(for [id todo
:let [task (graph id)
reqs (get all-reqs id)
args-map (select-keys results reqs)]
:when (not (active id))]
(do
(swap! status update :active conj id)
(future
(try (let [result (task args-map)]
(>!! reporter-ch [id result]))
(catch Exception e
(>!! reporter-ch e))))))))
(defn now []
(.getTime (java.util.Date.)))
(defn cleanup-status [jobs reporter-ch kill-ch status final-status]
(println final-status)
(doseq [j jobs] (future-cancel j))
(close! reporter-ch)
(close! kill-ch)
(swap! status assoc :status final-status :time (now))
(swap! status dissoc :kill :active :todo))
(defn processing-pipeline [graph]
(let [reporter-ch (chan)
kill-ch (chan)
first-doable (get-doable graph {})
status (atom
{:kill (fn [] (>!! kill-ch :die))
:status :running
:time (now)
:active #{}
:todo first-doable
:results {}})]
(go-loop [jobs (start-tasks graph reporter-ch status)]
(if (empty? jobs)
(cleanup-status jobs reporter-ch kill-ch status :finished)
(let [timeout-ch (timeout 10000)
[ch-v ch] (alts! [reporter-ch kill-ch])]
(cond
(= ch timeout-ch)
(cleanup-status jobs reporter-ch kill-ch status :timeout)
(= ch kill-ch)
(cleanup-status jobs reporter-ch kill-ch status :killed)
(instance? Throwable ch-v)
(do
(cleanup-status jobs reporter-ch kill-ch status :error)
(swap! status assoc :error ch-v))
:else
(do
(let [[k v] ch-v
_ (println "received:" k "-" v)
new-results (assoc (:results @status) k (or v :ok))
doable (get-doable graph new-results)]
(swap! status update :active disj k)
(swap! status assoc
:todo doable
:results new-results)
(recur (into (filterv #(not (realized? %)) jobs)
(start-tasks graph reporter-ch status)))))))))
status))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment