Skip to content

Instantly share code, notes, and snippets.

@raymcdermott
Last active June 6, 2018 14:12
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save raymcdermott/e495d7907ce83a64b049 to your computer and use it in GitHub Desktop.
Save raymcdermott/e495d7907ce83a64b049 to your computer and use it in GitHub Desktop.
core.async filtering by time
(ns green-eggs.core
(:require [clojure.core.async
:as a
:refer [>! <! >!! <!! go chan buffer close! thread
alts! alts!! timeout onto-chan]]
[clj-time.core :as t]
[clj-time.coerce :refer [from-date]]))
;If you were to implement yourself, you could make a `go` that simply pulls from a channel, and adds to another as a
;`[timestamp item]` pair, then finally pushes into an unbounded `chan` that has a transducer that filters based on
; age of that timestamp
(def green-eggs-n-ham
["in the rain"
"on a train"
"in a box"
"with a fox"
"in a house"
"with a mouse"
"here or there"
"anywhere"])
(defn delaying-timestamper [millis]
(let [now (t/now)]
(Thread/sleep millis)
now))
(defn add-timestamp [now-provider item]
(let [now (now-provider)]
[now item]))
(defn time-stamper
[timestamper in]
(let [out (chan)]
(a/go-loop []
(if-let [item (<! in)]
(if-let [enriched (timestamper item)]
(>! out enriched)))
(recur))
out))
; for testing the window
(def add-timestamp-with-delay (partial add-timestamp (partial delaying-timestamper 200)))
(def add-timestamp-no-delay (partial add-timestamp t/now))
(defn window [window-seconds window-open elem]
(let [[time item] elem
window-close (t/plus window-open (t/seconds window-seconds))
window (t/interval window-open window-close)]
(if (t/within? window time)
item)))
(def time-window (partial window 1 (t/now)))
(defn window-filter
[in]
(let [out (chan)]
(a/go-loop []
(if-let [item (time-window (<! in))]
(>! out item))
(recur))
out))
(defn simple-printer [in]
(a/go-loop []
(if-let [item (<! in)]
(println item))
(recur)))
(def in-chan (chan))
(def standard-channel (time-stamper add-timestamp-with-delay in-chan))
(def windowed (window-filter standard-channel))
(simple-printer windowed)
(onto-chan in-chan green-eggs-n-ham)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment