Channel that excludes "old" items (defined by max-age-millis)
(ns expiry-chan | |
(:require [clojure.core.async :as async :refer [go >! <! chan]])) | |
(defn expiry-chan [in max-age-millis] | |
(let [middler (chan 1024 (map #(vector (System/currentTimeMillis) %))) | |
out (chan)] | |
(async/pipe in middler) | |
(go (loop [] | |
(if-let [[ts value] (<! middler)] | |
(do | |
(when (< (- (System/currentTimeMillis) ts) max-age-millis) | |
(>! out value)) | |
(recur)) | |
(async/close! out)))) | |
out)) | |
(comment | |
(let [in (chan) | |
out (expiry-chan in 500)] | |
(go (loop [x 1] | |
(if (< x 10) | |
(do | |
(<! (async/timeout 5)) | |
(>! in x) | |
(recur (inc x))) | |
(async/close! in)))) | |
(async/<!! (go (loop [result []] | |
(<! (async/timeout 100)) ; simulate slow reader | |
(if-let [v (<! out)] | |
(recur (conj result v)) | |
result))))) | |
) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment