Created August 8, 2017 14:18
(ns cron.meteo
[clj-time.core :as t]
[clj-time.periodic :as tp]
[chime :refer [chime-at]]
[mount.core :refer [defstate]]
[monger.collection :as mc]
[monger.query :as mq]
[mlib.conf :refer [conf]]
[mlib.core :refer [to-int to-float]]
[mlib.log :refer [debug info warn error]]
[bots.db :refer [db_meteo]]
[meteo.db :refer [DAT_COLL HOURS_COLL]]))
(def ONE_HOUR (t/hours 1))
{ :_id :oid
:hour "date rounded to hours"
:st "station_id"
:t [:min :max :avg]
:p [:min :max :avg]
:h [:min :max :avg]
:w [:min :max :avg]
:b "int: 0-359, 360"
:wt :avg
:wl :avg})
(defn create-indexes [db]
(mc/create-index db HOURS_COLL
(array-map :hour 1 :st 1) {:unique true})
(catch Exception e
(warn "create-indexes:" e))))
(defstate indexes
(create-indexes (db_meteo)))
(defn interval-fetch [t0 t1]
(mc/find-maps (db_meteo) DAT_COLL
{:ts {:$gte t0 :$lt t1}})
(catch Exception e
(warn "interval-fetch:" e))))
(defn get-last-hour []
(mq/with-collection (db_meteo) HOURS_COLL
(mq/find {})
(mq/fields [:hour])
(mq/sort {:hour -1})
(mq/limit 1))
(catch Exception e
(warn "get-last-hour:" e))))
(defn update-st-hour [st hour data]
(debug "st-hour:" st hour data)
(when data
(mc/update (db_meteo) HOURS_COLL
{:st st :hour hour}
(assoc data :st st :hour hour)
{:upsert true})
(= 1))
(catch Exception e
(warn "update-st-hour:" st hour data e)))))
(defn calc-avg [mmac]
(let [{avs :avs cnt :cnt} mmac]
(if cnt
(dissoc mmac :avs :cnt)
:avg (/ (float avs) cnt))
(defn min-max-avg [data]
(when-let [v0 (first data)]
(->> (next data)
(fn [mmac val]
(when val
{ :min (min (:min mmac) val)
:max (max (:max mmac) val)
:avs (+ (:avs mmac) val)
:cnt (inc (:cnt mmac))}))
{:min v0 :max v0 :avs v0 :cnt 1})
(defn calc-w [ws gs]
(if-let [gm (:max gs)]
(if-let [wm (:max ws)]
(assoc ws :max (max wm gm))
(assoc ws :max gm))
(defn calc-b [bs]
(when (seq bs)
(let [low (apply min bs)
hi (apply max bs)]
(when (< hi 360)
(if (>= 90 (- hi low))
(/ (+ hi low) 2))
(let [h2 (+ 360 low)]
(when (>= 90 (- h2 hi))
(rem (/ (+ h2 hi) 2) 360)))))))))
(defn numf [key]
(fn [d]
(when-let [v (get d key)]
(and (number? v) v)
(to-int v)
(to-float v)))))
(defn st-aggregate [st data]
(let [tph
(for [k [:t :p :h :wt :wl]
:let [mma (min-max-avg (keep (numf k) data))]
:when mma]
[k mma])
w (calc-w
(min-max-avg (keep (numf :w) data))
(min-max-avg (keep (numf :g) data)))
b (let [max_w (:max w)]
(when (and max_w (< 0 max_w))
[b (calc-b (keep #(when-let [b (:b %)] b) data))]
{:avg b})))]
(into {}
(filter second
(conj tph [:w w] [:b b]))))
(catch Exception e
(warn "st-aggregate:" st data e))))
;;; ;;; ;;; ;;; ;;;
(defn calc-hour [t0 t1]
(if-let [data (not-empty (interval-fetch t0 t1))]
(doseq [[st st_vals] (group-by :st data)]
(update-st-hour st t0
(st-aggregate st st_vals)))
(info "calc-hour: no data - " t0 t1)))
(defn worker [this-hour]
(if-let [last-hour (get-last-hour)]
(loop [t0 (t/minus (t/floor last-hour t/hour) ONE_HOUR)]
(info "worker last-hour:" last-hour)
(when (t/before? t0 this-hour)
(let [t1 (t/plus t0 ONE_HOUR)]
(calc-hour t0 t1)
(recur t1))))
(error "worker: unable to get last hour")))
(defn start [cnf]
(let [offset (t/seconds (:offset cnf))
interval (t/seconds (:interval cnf))
pseq (tp/periodic-seq
(t/plus (t/floor (t/now) t/hour) offset)
(info "params:" offset interval)
(chime-at pseq worker)))
(defn stop [cron]
(when cron
(cron))) ;; stop it
(defstate cron
(if-let [cnf (-> conf :cron :meteo)]
(start cnf)
false) ;; cron disabled in config
(stop cron))
