-
-
Save otfrom/2f8e456442485fbb4056aed3f4f109b1 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(ns witan.send.driver.async | |
(:require [clojure.core.async :as a])) | |
(defn out-chan | |
([c] {::out-chan c}) | |
([] (out-chan (a/chan (* 1024 1024))))) | |
(defn add-chan [m k f] | |
(assoc m k (f m))) | |
(defn transducef [xf rf mult-key] | |
(fn [mults] | |
(a/transduce | |
xf | |
rf | |
(rf) | |
(a/tap (mult-key mults) (a/chan 1024))))) | |
(defn sorted-set-rf | |
([] (transient #{})) | |
([acc] (into (sorted-set) (persistent! acc))) | |
([acc new] (conj! acc new))) | |
(defn run-async-machine | |
"mults is a map of async/mults that will be used by the transducers in receiver-fns. | |
run-fn is a procedure that takes an out-chan and puts the input data | |
on it when called. | |
receiver-fns is a map of functions that return transducers that will | |
have a mult that wraps the out-chan passed into it. | |
Results will be returned in a map where the result of each of the | |
transducers returned b receiver-fns has <!! called on it." | |
[{::keys [out-chan] :as mults} run-fn receiver-fns] | |
(let [receivers (into {} | |
(map (fn [[k f]] [k (f mults)])) | |
receiver-fns)] | |
(run-fn out-chan) | |
(into {} | |
(map (fn [[k r]] | |
[k (a/<!! r)])) | |
receivers))) | |
(comment | |
(def first-calendar-year 2020) | |
(def simulated-transitions-data-dir "../witan.send/data/demo/results/simulated-transitions/") | |
(require '[witan.send.driver.simulated-transitions :as st]) | |
(require '[witan.send.driver.simulated-transitions.io :as stio]) | |
(def machine-results | |
(run-async-machine | |
(-> (out-chan) | |
(add-chan ::transitions-mult | |
(fn [m] (a/mult (::out-chan m)))) | |
(add-chan ::census-chan | |
(fn [m] (a/tap (::transitions-mult m) (a/chan 1024 (st/simulated-transitions->census-xf first-calendar-year))))) | |
(add-chan ::census-mult | |
(fn [m] (a/mult (::census-chan m))))) | |
(fn [c] (stio/async-nippy->data simulated-transitions-data-dir c)) | |
{::calendar-years (transducef | |
(comp | |
(filter #(= 1 (:simulation %))) | |
(map :calendar-year)) | |
sorted-set-rf | |
::transitions-mult) | |
::settings (transducef | |
(comp | |
(filter #(= 1 (:simulation %))) | |
(map :setting)) | |
sorted-set-rf | |
::census-mult)})) | |
) | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(ns witan.send.driver.simulated-transitions.io | |
(:require [clojure.core.async :as a] | |
[clojure.data.csv :as csv] | |
[clojure.java.io :as io] | |
[net.cgrand.xforms :as x] | |
[taoensso.nippy :as nippy] | |
[witan.send.driver.ingest :as i])) | |
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; | |
;; nippy io | |
(defn ->nippy [dirname data] | |
(run! | |
(fn [data] | |
(let [idx (-> data first :simulation)] | |
(nippy/freeze-to-file (str dirname "simulated-transitions-" idx ".npy") data))) | |
(partition-by (fn [{:keys [simulation]}] (quot simulation 100)) data))) | |
(def nippy->data-xf | |
(comp | |
(filter (fn [f] (re-find #"npy$" (.getName f)))) | |
(map (fn [f] (println "Processing file: " (.getName f)) f)) | |
(mapcat (fn [f] (into [] | |
(map-indexed | |
(fn [idx transition] | |
(assoc transition | |
::filename (.getName f) | |
::file-idx idx))) | |
(nippy/thaw-from-file f)))))) | |
(defn file-list [dirname] | |
(.listFiles (io/file dirname))) | |
(defn nippy->data [dirname] | |
(eduction | |
nippy->data-xf | |
(file-list dirname))) | |
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; | |
;; async base | |
(defn async-nippy->data [simulated-transitions-data-dir out-chan] | |
(let [in-chan (a/to-chan!! (sort-by #(.getName %) (file-list simulated-transitions-data-dir))) | |
_ (a/pipeline-blocking | |
3 | |
out-chan | |
nippy->data-xf | |
in-chan | |
true | |
(fn [e] | |
(a/close! out-chan) | |
(a/close! in-chan)))] | |
out-chan)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment