Skip to content

Instantly share code, notes, and snippets.

@w01fe
Forked from anonymous/graph_async.clj
Created February 4, 2013 21:47
Show Gist options
  • Star 4 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save w01fe/4710008 to your computer and use it in GitHub Desktop.
Save w01fe/4710008 to your computer and use it in GitHub Desktop.
(ns plumbing.graph-async
(:require
[plumbing.fnk.pfnk :as pfnk]
[plumbing.fnk.schema :as schema]
[plumbing.core :as plumbing]
[plumbing.graph :as graph]))
;; async function has ^:async metadata, callback required key.
;; TODO: redo with just promises/futures once they have callback options
;; TODO: make nicer way to specify async fnks?
(defn asyncify
"Take a fnk f and return an async version of f with ^:async metadata,
which accepts a keyword parameter :callback called with the result.
If f is already async, returns it; otherwise, wraps it in an async
wrapper."
[f]
(if (:async (meta f))
(do (assert (contains? (pfnk/input-schema f) :callback))
f)
(do (assert (not (contains? (pfnk/input-schema f) :callback)))
(pfnk/fn->fnk
(fn [m] ((:callback m) (f (dissoc m :callback))))
[(assoc (pfnk/input-schema f) :callback true)
(pfnk/output-schema f)]))))
(defn syncify
"Take an async fnk f and return a sync version that runs f then blocks
for the callback to be called and finally returns the async result."
[f]
(assert (and (:async (meta f)) (contains? (pfnk/input-schema f) :callback)))
(pfnk/fn->fnk
(fn [m]
(let [p (promise)]
(f (assoc m :callback #(deliver p %)))
@p))
[(dissoc (pfnk/input-schema f) :callback)
(pfnk/output-schema f)]))
(defn async-parallel-compile
"Experimental.
Compile a hierarchical graph with (some) async fnks into an async fnk.
An async fnk has ^:async metadata, and accepts a special :callback
keyword parameter which will be called with the result of the function
(rather than returning it directly.)
Each node function will be launched in a future as soon as its dependencies
have been fully computed. Thus,
(syncify (async-parallel-compile g))
is similar to graph/parallel-compile, except that:
- It can handle async fnks as well as ordinary sync fnks.
- It makes smarter use of threads, only launching a future when a node's
dependencies have been computed (rather than launching one future for
each node immediately, each of which blocks until its dependencies
are computed."
[g]
(if (fn? g)
(asyncify g)
(let [g (graph/->graph (plumbing/map-vals async-parallel-compile g))
req-ks (schema/required-toplevel-keys (pfnk/input-schema g))
edges (concat
(for [[k v] g
parent-k (filter g (keys (pfnk/input-schema v)))]
[parent-k k])
(for [k (keys g)]
[k ::done]))
child-map (->> edges
(group-by first)
(plumbing/map-vals #(set (map second %))))
parent-map (->> edges
(group-by second)
(plumbing/map-vals #(set (map first %))))]
(vary-meta
(pfnk/fn->fnk
(fn [m]
(let [missing-keys (seq (remove #(contains? m %) req-ks))]
(schema/assert-iae (empty? missing-keys)
"Missing top-level keys in graph input: %s"
(set missing-keys)))
(let [remaining-parents (atom parent-map)
results (atom m)
run-node (fn run-node [k]
(if (= ::done k)
((:callback m) (select-keys @results (keys g)))
(let [f (g k)]
(future (f (assoc (select-keys @results (keys (pfnk/input-schema f)))
:callback (fn [r]
(swap! results assoc k r)
(doseq [c (child-map k)]
(when (empty? (c (swap! remaining-parents
update-in [c]
disj k)))
(run-node c))))))))))]
(doseq [k (keys g)]
(when (empty? (parent-map k))
(run-node k)))))
[(assoc (pfnk/input-schema g) :callback true)
(pfnk/output-schema g)])
assoc :async true))))
(comment
(time
(println
((syncify
(async-parallel-compile
{:a {:a1 (plumbing/fnk [x] (Thread/sleep 1000) (inc x))
:a2 (vary-meta (plumbing/fnk [x callback] (Thread/sleep 1000) (callback (- x 10)))
assoc :async true)}
:b (plumbing/fnk [[:a a1]] (Thread/sleep 1000) (* a1 2))
:c (plumbing/fnk [[:a a2]] (Thread/sleep 1000) (* a2 2))}))
{:x 1}))))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment