Skip to content

Instantly share code, notes, and snippets.

@otfrom

otfrom/async.clj Secret

Created October 20, 2020 16:02
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save otfrom/2f8e456442485fbb4056aed3f4f109b1 to your computer and use it in GitHub Desktop.
Save otfrom/2f8e456442485fbb4056aed3f4f109b1 to your computer and use it in GitHub Desktop.
(ns witan.send.driver.async
(:require [clojure.core.async :as a]))
(defn out-chan
([c] {::out-chan c})
([] (out-chan (a/chan (* 1024 1024)))))
(defn add-chan [m k f]
(assoc m k (f m)))
(defn transducef [xf rf mult-key]
(fn [mults]
(a/transduce
xf
rf
(rf)
(a/tap (mult-key mults) (a/chan 1024)))))
(defn sorted-set-rf
([] (transient #{}))
([acc] (into (sorted-set) (persistent! acc)))
([acc new] (conj! acc new)))
(defn run-async-machine
"mults is a map of async/mults that will be used by the transducers in receiver-fns.
run-fn is a procedure that takes an out-chan and puts the input data
on it when called.
receiver-fns is a map of functions that return transducers that will
have a mult that wraps the out-chan passed into it.
Results will be returned in a map where the result of each of the
transducers returned b receiver-fns has <!! called on it."
[{::keys [out-chan] :as mults} run-fn receiver-fns]
(let [receivers (into {}
(map (fn [[k f]] [k (f mults)]))
receiver-fns)]
(run-fn out-chan)
(into {}
(map (fn [[k r]]
[k (a/<!! r)]))
receivers)))
(comment
(def first-calendar-year 2020)
(def simulated-transitions-data-dir "../witan.send/data/demo/results/simulated-transitions/")
(require '[witan.send.driver.simulated-transitions :as st])
(require '[witan.send.driver.simulated-transitions.io :as stio])
(def machine-results
(run-async-machine
(-> (out-chan)
(add-chan ::transitions-mult
(fn [m] (a/mult (::out-chan m))))
(add-chan ::census-chan
(fn [m] (a/tap (::transitions-mult m) (a/chan 1024 (st/simulated-transitions->census-xf first-calendar-year)))))
(add-chan ::census-mult
(fn [m] (a/mult (::census-chan m)))))
(fn [c] (stio/async-nippy->data simulated-transitions-data-dir c))
{::calendar-years (transducef
(comp
(filter #(= 1 (:simulation %)))
(map :calendar-year))
sorted-set-rf
::transitions-mult)
::settings (transducef
(comp
(filter #(= 1 (:simulation %)))
(map :setting))
sorted-set-rf
::census-mult)}))
)
(ns witan.send.driver.simulated-transitions.io
(:require [clojure.core.async :as a]
[clojure.data.csv :as csv]
[clojure.java.io :as io]
[net.cgrand.xforms :as x]
[taoensso.nippy :as nippy]
[witan.send.driver.ingest :as i]))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; nippy io
(defn ->nippy [dirname data]
(run!
(fn [data]
(let [idx (-> data first :simulation)]
(nippy/freeze-to-file (str dirname "simulated-transitions-" idx ".npy") data)))
(partition-by (fn [{:keys [simulation]}] (quot simulation 100)) data)))
(def nippy->data-xf
(comp
(filter (fn [f] (re-find #"npy$" (.getName f))))
(map (fn [f] (println "Processing file: " (.getName f)) f))
(mapcat (fn [f] (into []
(map-indexed
(fn [idx transition]
(assoc transition
::filename (.getName f)
::file-idx idx)))
(nippy/thaw-from-file f))))))
(defn file-list [dirname]
(.listFiles (io/file dirname)))
(defn nippy->data [dirname]
(eduction
nippy->data-xf
(file-list dirname)))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; async base
(defn async-nippy->data [simulated-transitions-data-dir out-chan]
(let [in-chan (a/to-chan!! (sort-by #(.getName %) (file-list simulated-transitions-data-dir)))
_ (a/pipeline-blocking
3
out-chan
nippy->data-xf
in-chan
true
(fn [e]
(a/close! out-chan)
(a/close! in-chan)))]
out-chan))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment