Skip to content

Instantly share code, notes, and snippets.

@gardnervickers
Last active November 20, 2015 19:18
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save gardnervickers/df38ed0baac31f5920b1 to your computer and use it in GitHub Desktop.
Save gardnervickers/df38ed0baac31f5920b1 to your computer and use it in GitHub Desktop.
(defn start-go-peer
"Applies XF"
([in-chan xf] (start-go-peer in-chan xf (chan)))
([in-chan xf out-chan] (start-go-peer in-chan xf out-chan 1))
([in-chan xf out-chan parallelism]
(let [running? (atom true)]
(do (async/pipeline parallelism out-chan xf in-chan)
{:fauonyx/out out-chan
:fauonyx/in in-chan
:fauonyx/transform xf
:fauonyx/running? running?}))))
(defn instrument-catalog-entries [catalog]
(mapv
(fn [entry]
(merge entry
(start-go-peer (chan) (if (:onyx/plugin entry)
(map identity)
(map (resolve-fn entry)))))) catalog))
(defn build-network [catalog graph node in-chan]
(let [dependents (dep/immediate-dependents graph node)
node (get catalog node)]
(async/pipe in-chan (:fauonyx/in node))
(mapv #(build-network catalog graph % (:fauonyx/out node)) dependents)))
(defn new-faux-onyx []
(atom nil))
(defn submit-faux-job [fauonyx workflow catalog lifecycles]
(reset! fauonyx (let [insturmented-catalog (instrument-catalog-entries catalog)
catalog-norm (reduce (fn [idx nxt]
(assoc idx (:onyx/name nxt) nxt)) {} insturmented-catalog)
dep-tree (p/discover-tasks catalog workflow)]
(build-network catalog-norm (p/to-dependency-graph workflow) :in nil)
catalog-norm))) ;;assuming that the first item in the wf is your root
(defn send-segment-to-fauonyx [fauonyx segment root]
(let [in-chan (get-in @fauonyx [root :fauonyx/in])]
(async/offer! in-chan segment))) ;; Stop freezing my repl
(defn get-segment-from-fauonyx [fauonyx exit]
(let [out-chan (get-in @fauonyx [exit :fauonyx/out])]
(async/poll! out-chan)))
(def send-segment-through-onyx-spec
{:model/args (fn [onyx]
[onyx (gen/map (gen/return :data) (gen/nat)) :in])
:next-state (fn [onyx [_ segment] _]
(send-segment-to-fauonyx onyx segment :in))
:real/command #'identity})
(def get-segment-from-onyx-spec
{:model/args (fn [onyx]
[onyx (gen/map (gen/return :data) (gen/nat)) :in])
:next-state (fn [onyx [_ segment] _]
(get-segment-from-fauonyx onyx :out))
:real/command #'identity})
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment