Skip to content

Instantly share code, notes, and snippets.

Last active October 2, 2016 19:52
Show Gist options
  • Save dadair-ca/84932f9e77eb1f62b63ed606b898a214 to your computer and use it in GitHub Desktop.
Save dadair-ca/84932f9e77eb1f62b63ed606b898a214 to your computer and use it in GitHub Desktop.
core async processing improvements
;; Forecast generation
(defn inst->forecasts [inst]
(for [r (range 1 25)]
[(c/to-sql-time (t/plus (without-minutes inst) (t/hours r))) (rand-int 50)]))
;; Async
(defn process-fn [inst]
{:vals (inst->forecasts inst)})
;; there is a launch function for each variable of interest.
;; in this case, this is the "forecast" launcher. There are other namespaces
;; with other launch functions, say for "variable 1" "variable 2", etc. Really, only
;; `schedule`, `process-fn`, and the persist function (`insert-forecasts` here) are unique to each job.
;; `in`, `out`, and `chime` are shown more clearly for their intent in the pipeline model image linked
;; in the `` file included in this gist.
(defn launch []
(let [in (chan)
out (chan)
schedule (p/periodic-seq
(let [n (t/now)]
(t/date-time (t/year n) (t/month n) (t/day n) (t/hour n) 0))
(t/minutes 1))
chime (chime-ch schedule)]
((persist insert-forecasts db) out)
((process process-fn) in out)
(realtime chime in)
[in out chime]))

Example pipeline model:

Each variable would duplicate this model. Only some variables need the bootstrap function, otherwise it's a sequential pipeline.

(ns ngin.utils
(:require [clojure.core.async :refer [<! >! go]]))
(defn realtime [ch in]
(println (str ">>> started `realtime` in " *ns*))
(go (loop []
(when-let [v (<! ch)]
(>! in v)
(println (str ">>> closing `realtime` in " *ns*))))
(defn process [f]
(fn [in out]
(println (str ">>> started `process` in " *ns*))
(go (loop []
(when-let [inst (<! in)]
(>! out (f inst))
(println (str ">>> closing `process` in " *ns*)))))
(defn persist [f db]
(fn [ch]
(println (str ">>> started `persist` in " *ns*))
(go (loop []
(when-let [v (<! ch)]
(f db v)
(println (str ">>> closing `persist` in " *ns*)))))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment