Created
August 8, 2017 14:18
-
-
Save maxp/6419daf250835a19821bcee327bb148e to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(ns cron.meteo | |
(:require | |
[clj-time.core :as t] | |
[clj-time.periodic :as tp] | |
[chime :refer [chime-at]] | |
[mount.core :refer [defstate]] | |
[monger.collection :as mc] | |
[monger.query :as mq] | |
; | |
[mlib.conf :refer [conf]] | |
[mlib.core :refer [to-int to-float]] | |
[mlib.log :refer [debug info warn error]] | |
; | |
[bots.db :refer [db_meteo]] | |
[meteo.db :refer [DAT_COLL HOURS_COLL]])) | |
; | |
(def ONE_HOUR (t/hours 1)) | |
(comment | |
{ :_id :oid | |
:hour "date rounded to hours" | |
:st "station_id" | |
:t [:min :max :avg] | |
:p [:min :max :avg] | |
:h [:min :max :avg] | |
:w [:min :max :avg] | |
:b "int: 0-359, 360" | |
:wt :avg | |
:wl :avg}) | |
; | |
(defn create-indexes [db] | |
(try | |
(mc/create-index db HOURS_COLL | |
(array-map :hour 1 :st 1) {:unique true}) | |
(catch Exception e | |
(warn "create-indexes:" e)))) | |
; | |
(defstate indexes | |
:start | |
(create-indexes (db_meteo))) | |
; | |
(defn interval-fetch [t0 t1] | |
(try | |
(mc/find-maps (db_meteo) DAT_COLL | |
{:ts {:$gte t0 :$lt t1}}) | |
(catch Exception e | |
(warn "interval-fetch:" e)))) | |
; | |
(defn get-last-hour [] | |
(try | |
(-> | |
(mq/with-collection (db_meteo) HOURS_COLL | |
(mq/find {}) | |
(mq/fields [:hour]) | |
(mq/sort {:hour -1}) | |
(mq/limit 1)) | |
(first) | |
(:hour)) | |
(catch Exception e | |
(warn "get-last-hour:" e)))) | |
; | |
(defn update-st-hour [st hour data] | |
(debug "st-hour:" st hour data) | |
(when data | |
(try | |
(-> | |
(mc/update (db_meteo) HOURS_COLL | |
{:st st :hour hour} | |
(assoc data :st st :hour hour) | |
{:upsert true}) | |
(.getN) | |
(= 1)) | |
(catch Exception e | |
(warn "update-st-hour:" st hour data e))))) | |
; | |
(defn calc-avg [mmac] | |
(let [{avs :avs cnt :cnt} mmac] | |
(if cnt | |
(assoc | |
(dissoc mmac :avs :cnt) | |
:avg (/ (float avs) cnt)) | |
;; | |
mmac))) | |
; | |
(defn min-max-avg [data] | |
(when-let [v0 (first data)] | |
(->> (next data) | |
(reduce | |
(fn [mmac val] | |
(when val | |
{ :min (min (:min mmac) val) | |
:max (max (:max mmac) val) | |
:avs (+ (:avs mmac) val) | |
:cnt (inc (:cnt mmac))})) | |
{:min v0 :max v0 :avs v0 :cnt 1}) | |
(calc-avg)))) | |
; | |
(defn calc-w [ws gs] | |
(if-let [gm (:max gs)] | |
(if-let [wm (:max ws)] | |
(assoc ws :max (max wm gm)) | |
(assoc ws :max gm)) | |
ws)) | |
; | |
(defn calc-b [bs] | |
(when (seq bs) | |
(let [low (apply min bs) | |
hi (apply max bs)] | |
(when (< hi 360) | |
(if (>= 90 (- hi low)) | |
(int | |
(/ (+ hi low) 2)) | |
;; | |
(let [h2 (+ 360 low)] | |
(when (>= 90 (- h2 hi)) | |
(int | |
(rem (/ (+ h2 hi) 2) 360))))))))) | |
; | |
(defn numf [key] | |
(fn [d] | |
(when-let [v (get d key)] | |
(or | |
(and (number? v) v) | |
(to-int v) | |
(to-float v))))) | |
; | |
(defn st-aggregate [st data] | |
(try | |
(let [tph | |
(for [k [:t :p :h :wt :wl] | |
:let [mma (min-max-avg (keep (numf k) data))] | |
:when mma] | |
[k mma]) | |
w (calc-w | |
(min-max-avg (keep (numf :w) data)) | |
(min-max-avg (keep (numf :g) data))) | |
b (let [max_w (:max w)] | |
(when (and max_w (< 0 max_w)) | |
(when-let | |
[b (calc-b (keep #(when-let [b (:b %)] b) data))] | |
{:avg b})))] | |
(into {} | |
(filter second | |
(conj tph [:w w] [:b b])))) | |
(catch Exception e | |
(warn "st-aggregate:" st data e)))) | |
; | |
;;; ;;; ;;; ;;; ;;; | |
(defn calc-hour [t0 t1] | |
(if-let [data (not-empty (interval-fetch t0 t1))] | |
(doseq [[st st_vals] (group-by :st data)] | |
(update-st-hour st t0 | |
(st-aggregate st st_vals))) | |
;; | |
(info "calc-hour: no data - " t0 t1))) | |
; | |
(defn worker [this-hour] | |
(if-let [last-hour (get-last-hour)] | |
(loop [t0 (t/minus (t/floor last-hour t/hour) ONE_HOUR)] | |
(info "worker last-hour:" last-hour) | |
(when (t/before? t0 this-hour) | |
(let [t1 (t/plus t0 ONE_HOUR)] | |
(calc-hour t0 t1) | |
(recur t1)))) | |
;; | |
(error "worker: unable to get last hour"))) | |
; | |
(defn start [cnf] | |
(let [offset (t/seconds (:offset cnf)) | |
interval (t/seconds (:interval cnf)) | |
pseq (tp/periodic-seq | |
(t/plus (t/floor (t/now) t/hour) offset) | |
interval)] | |
(info "params:" offset interval) | |
(chime-at pseq worker))) | |
; | |
(defn stop [cron] | |
(when cron | |
(cron))) ;; stop it | |
; | |
(defstate cron | |
:start | |
(if-let [cnf (-> conf :cron :meteo)] | |
(start cnf) | |
false) ;; cron disabled in config | |
:stop | |
(stop cron)) | |
; | |
;;. |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment