Last active
May 3, 2016 14:36
-
-
Save gardnervickers/3bb5c274b544802ec168c3bb81a5010b to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(defn segments | |
([] (segments {:event-time 1 :name "mike"})) | |
([n] (lazy-seq (cons n (segments (-> (update n :event-time inc) | |
(assoc :name (rand-nth ["gardner" "mike" "lucas"])))))))) | |
(defn wids-lower [min-windowing-attr w-slide t] | |
(dec (long (Math/floor (/ (- t | |
min-windowing-attr) | |
w-slide))))) | |
(defn wids-upper [min-windowing-attr w-range w-slide t] | |
(dec (long (Math/floor (/ (- (+ t | |
w-range) | |
min-windowing-attr) | |
w-slide))))) | |
(defn wids [min-windowing-attr w-range w-slide t] | |
(let [lower (wids-lower min-windowing-attr w-slide t) | |
upper (wids-upper min-windowing-attr w-range w-slide t)] | |
(range (inc lower) (inc upper)))) | |
(defn extent-lower [min-windowing-attr w-range w-slide w] | |
(max min-windowing-attr (- (+ min-windowing-attr (* w-slide (inc w))) w-range))) | |
(defn extent-upper [min-windowing-attr w-slide w] | |
(dec (+ min-windowing-attr (* w-slide (inc w))))) | |
(defn extents [min-windowing-attr w-range w-slide w] | |
(range (extent-lower min-windowing-attr w-range w-slide w) | |
(inc (extent-upper min-windowing-attr w-slide w)))) | |
(defn trigger-every [n trigger] | |
(fn [rf] | |
(let [cnt (atom 0)] | |
(fn | |
([] (rf)) | |
([result] (rf result)) | |
([result input] | |
(swap! cnt inc) | |
(if (= @cnt n) | |
(do (reset! cnt 0) | |
(rf (rf result input) trigger)) | |
(rf result input))))))) | |
(defn windowing | |
[agg-fn event-time-key range slide session-key] | |
(fn [rf] | |
(let [window (atom {})] | |
(fn | |
([] (rf)) | |
([result] | |
(let [w @window] | |
(reset! window {}) | |
(reduce (fn [acc [k v]] | |
(rf acc {k v})) result w))) | |
([result input] | |
(if (not (vector? input)) | |
(let [event-time (get input event-time-key) | |
session (get input session-key) | |
wid (wids 0 range slide event-time) | |
placements (map (fn [w] [w session]) wid)] | |
(doseq [placement placements] | |
(swap! window update-in placement agg-fn input)) | |
result) | |
(let [[command extant op] input | |
w @window] | |
(cond | |
(and (= extant :all) | |
(= op :discarding)) (do (reset! window {}) | |
(rf result w)) | |
(and (= extant :all) | |
(= op :accumulating)) (do (rf result w)) | |
:else (do (println "unknown trigger type" input) | |
result))))))))) | |
(comment "This example prints windows in order, lazily") | |
(take 1 (eduction (comp | |
(trigger-every 10 [:trigger :all :accumulating]) | |
(windowing conj :event-time 5 5 :_)) | |
(segments))) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment