Last active
November 20, 2015 19:18
-
-
Save gardnervickers/df38ed0baac31f5920b1 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(defn start-go-peer | |
"Applies XF" | |
([in-chan xf] (start-go-peer in-chan xf (chan))) | |
([in-chan xf out-chan] (start-go-peer in-chan xf out-chan 1)) | |
([in-chan xf out-chan parallelism] | |
(let [running? (atom true)] | |
(do (async/pipeline parallelism out-chan xf in-chan) | |
{:fauonyx/out out-chan | |
:fauonyx/in in-chan | |
:fauonyx/transform xf | |
:fauonyx/running? running?})))) | |
(defn instrument-catalog-entries [catalog] | |
(mapv | |
(fn [entry] | |
(merge entry | |
(start-go-peer (chan) (if (:onyx/plugin entry) | |
(map identity) | |
(map (resolve-fn entry)))))) catalog)) | |
(defn build-network [catalog graph node in-chan] | |
(let [dependents (dep/immediate-dependents graph node) | |
node (get catalog node)] | |
(async/pipe in-chan (:fauonyx/in node)) | |
(mapv #(build-network catalog graph % (:fauonyx/out node)) dependents))) | |
(defn new-faux-onyx [] | |
(atom nil)) | |
(defn submit-faux-job [fauonyx workflow catalog lifecycles] | |
(reset! fauonyx (let [insturmented-catalog (instrument-catalog-entries catalog) | |
catalog-norm (reduce (fn [idx nxt] | |
(assoc idx (:onyx/name nxt) nxt)) {} insturmented-catalog) | |
dep-tree (p/discover-tasks catalog workflow)] | |
(build-network catalog-norm (p/to-dependency-graph workflow) :in nil) | |
catalog-norm))) ;;assuming that the first item in the wf is your root | |
(defn send-segment-to-fauonyx [fauonyx segment root] | |
(let [in-chan (get-in @fauonyx [root :fauonyx/in])] | |
(async/offer! in-chan segment))) ;; Stop freezing my repl | |
(defn get-segment-from-fauonyx [fauonyx exit] | |
(let [out-chan (get-in @fauonyx [exit :fauonyx/out])] | |
(async/poll! out-chan))) | |
(def send-segment-through-onyx-spec | |
{:model/args (fn [onyx] | |
[onyx (gen/map (gen/return :data) (gen/nat)) :in]) | |
:next-state (fn [onyx [_ segment] _] | |
(send-segment-to-fauonyx onyx segment :in)) | |
:real/command #'identity}) | |
(def get-segment-from-onyx-spec | |
{:model/args (fn [onyx] | |
[onyx (gen/map (gen/return :data) (gen/nat)) :in]) | |
:next-state (fn [onyx [_ segment] _] | |
(get-segment-from-fauonyx onyx :out)) | |
:real/command #'identity}) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment