Navigation Menu

Skip to content

Instantly share code, notes, and snippets.

@gardnervickers
Last active May 3, 2016 14:36
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save gardnervickers/3bb5c274b544802ec168c3bb81a5010b to your computer and use it in GitHub Desktop.
Save gardnervickers/3bb5c274b544802ec168c3bb81a5010b to your computer and use it in GitHub Desktop.
(defn segments
([] (segments {:event-time 1 :name "mike"}))
([n] (lazy-seq (cons n (segments (-> (update n :event-time inc)
(assoc :name (rand-nth ["gardner" "mike" "lucas"]))))))))
(defn wids-lower [min-windowing-attr w-slide t]
(dec (long (Math/floor (/ (- t
min-windowing-attr)
w-slide)))))
(defn wids-upper [min-windowing-attr w-range w-slide t]
(dec (long (Math/floor (/ (- (+ t
w-range)
min-windowing-attr)
w-slide)))))
(defn wids [min-windowing-attr w-range w-slide t]
(let [lower (wids-lower min-windowing-attr w-slide t)
upper (wids-upper min-windowing-attr w-range w-slide t)]
(range (inc lower) (inc upper))))
(defn extent-lower [min-windowing-attr w-range w-slide w]
(max min-windowing-attr (- (+ min-windowing-attr (* w-slide (inc w))) w-range)))
(defn extent-upper [min-windowing-attr w-slide w]
(dec (+ min-windowing-attr (* w-slide (inc w)))))
(defn extents [min-windowing-attr w-range w-slide w]
(range (extent-lower min-windowing-attr w-range w-slide w)
(inc (extent-upper min-windowing-attr w-slide w))))
(defn trigger-every [n trigger]
(fn [rf]
(let [cnt (atom 0)]
(fn
([] (rf))
([result] (rf result))
([result input]
(swap! cnt inc)
(if (= @cnt n)
(do (reset! cnt 0)
(rf (rf result input) trigger))
(rf result input)))))))
(defn windowing
[agg-fn event-time-key range slide session-key]
(fn [rf]
(let [window (atom {})]
(fn
([] (rf))
([result]
(let [w @window]
(reset! window {})
(reduce (fn [acc [k v]]
(rf acc {k v})) result w)))
([result input]
(if (not (vector? input))
(let [event-time (get input event-time-key)
session (get input session-key)
wid (wids 0 range slide event-time)
placements (map (fn [w] [w session]) wid)]
(doseq [placement placements]
(swap! window update-in placement agg-fn input))
result)
(let [[command extant op] input
w @window]
(cond
(and (= extant :all)
(= op :discarding)) (do (reset! window {})
(rf result w))
(and (= extant :all)
(= op :accumulating)) (do (rf result w))
:else (do (println "unknown trigger type" input)
result)))))))))
(comment "This example prints windows in order, lazily")
(take 1 (eduction (comp
(trigger-every 10 [:trigger :all :accumulating])
(windowing conj :event-time 5 5 :_))
(segments)))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment