(ns qaxl.core | |
(:use clojure.walk clojure.pprint qaxl.cache) | |
(:require [co.paralleluniverse.pulsar.async :as qa] | |
[clojure.core.async :as a] | |
[co.paralleluniverse.pulsar.core :as q] | |
[clojure.test :refer [function?]] | |
[co.paralleluniverse.fiber.httpkit.client :as hk] | |
[co.paralleluniverse.pulsar.core :refer [fiber]] | |
[clojure.core.match :refer [match]])) | |
(declare parallelize) | |
(defmacro qaxl | |
"Parallelize and evaluate an expression" | |
[form] | |
(:form (parallelize form))) | |
(defn- res= [a b] | |
(or (= a b) | |
(let [ra (resolve a) | |
rb (resolve b)] | |
(and ra rb (= (resolve a) (resolve b)))))) | |
;; If there's a channel that will return the value of this symbol, substitute its deref. | |
(defn- parallelize-subst [s s2p] | |
(if (symbol? s) | |
(if (contains? s2p s) | |
{:form `(deref ~(get s2p s)) :s2p s2p :par true} | |
{:form s :par (some-> s resolve var-get q/suspendable?) :s2p s2p}) | |
{:form s :s2p s2p})) | |
(defn- parallelize-forms [forms s2p] (map #(parallelize % s2p) forms)) | |
(defn- par-to-bindings [forms s2p] ;; => [bs args] | |
(let [ps (parallelize-forms forms s2p) | |
[bs args] (reduce (fn [[bs args] p] | |
(if (:par p) | |
(let [ch (gensym "p")] | |
[(concat bs [ch `(q/promise (fn [] ~(:form p)))]) | |
(conj args `(deref ~ch))]) | |
[bs (conj args (:form p))])) | |
[[] []] | |
ps)] | |
[ps bs (seq args)])) | |
(defn- parallelize-func [forms s2p] | |
(when (:trace s2p) (println "parallelize-func" forms)) | |
(let [[ps bs forms] (par-to-bindings forms s2p) | |
par (some :par ps)] | |
{:form (if (seq bs) `(let [~@bs] (~@forms)) `(~@forms)) :par par :s2p s2p})) | |
;; Possibly lazy | |
(defn- parallelize-special [[s & forms] s2p] | |
(when (:trace s2p) (println "parallelize-special" s forms)) | |
(let [ps (parallelize-forms forms s2p)] | |
{:form `(~s ~@(map :form ps)) :par (some :par ps) :s2p s2p})) | |
(defn- parallelize-coll [form s2p] | |
(let [[ps bs forms] (par-to-bindings (seq form) s2p)] | |
{:form (if (seq bs) `(let [~@bs] ~(into (empty form) forms)) form) | |
:par (some :par ps) :s2p s2p})) | |
(defn- parallelize-map [[_ & forms] s2p] | |
(when (:trace s2p) (println "parallelize-map" forms)) | |
(let [[ps bs [f & args]] (par-to-bindings forms s2p) | |
m1 (if-not (:par (first ps)) | |
`(map ~f ~@args) | |
`(map deref | |
(doall (map (fn [& xs#] | |
(q/promise (fn [] (apply ~f xs#)))) | |
~@args ))))] | |
{:form (if (seq bs) `(let [~@bs] ~m1) m1) :par (some :par ps)})) | |
(defn parallelize-let [[_ bs & forms] s2p] | |
(when (:trace s2p) (println "parallelize-let" bs forms)) | |
(let [[bs1 bs2] (reduce | |
(fn [[bs1 bs2 s2p] [s form]] | |
(let [{:keys [form par]} (parallelize form s2p)] | |
(if par | |
(let [ch (gensym "p")] | |
[(concat bs1 [ch `(q/promise (fn [] ~form))]) | |
(concat bs2 [s `(deref ~ch)]) | |
(assoc s2p s ch)]) | |
[bs1 (concat bs2 [s form]) s2p]))) | |
[[] [] s2p] | |
(partition 2 bs)) | |
ps (parallelize-forms forms s2p)] | |
{:form `(let [~@(concat bs1 bs2)] ~@(map :form ps)) :par (or (seq bs1) (some :par ps))})) | |
(defn parallelize | |
"Parallelize a form, returning map: | |
:form Parallelized form that can be substituted. | |
:par parallel? | |
:s2p Map of user symbols to channel symbols | |
To evaluate, one would apply the bindings and then dereference the channel." | |
[form & [s2p]] | |
(when (:trace s2p) (println "parallelize " form (type form))) | |
(cond | |
(nil? s2p) (parallelize form {}) | |
(seq? form) | |
(if-let [f (as-> (first form) f (when (symbol? f) f))] | |
(cond | |
(res= 'let f) (parallelize-let form s2p) | |
(res= 'map f) (parallelize-map form s2p) | |
(function? f) (parallelize-func form s2p) | |
(:macro (meta (resolve f))) (parallelize (macroexpand form) s2p) | |
:else (parallelize-special form s2p)) | |
(parallelize-coll form s2p)) | |
(coll? form) (parallelize-coll form s2p) | |
:else (parallelize-subst form s2p))) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment