Create a gist now

Instantly share code, notes, and snippets.

What would you like to do?
Channel that excludes "old" items (defined by max-age-millis)
(ns expiry-chan
(:require [clojure.core.async :as async :refer [go >! <! chan]]))
(defn expiry-chan [in max-age-millis]
(let [middler (chan 1024 (map #(vector (System/currentTimeMillis) %)))
out (chan)]
(async/pipe in middler)
(go (loop []
(if-let [[ts value] (<! middler)]
(do
(when (< (- (System/currentTimeMillis) ts) max-age-millis)
(>! out value))
(recur))
(async/close! out))))
out))
(comment
(let [in (chan)
out (expiry-chan in 500)]
(go (loop [x 1]
(if (< x 10)
(do
(<! (async/timeout 5))
(>! in x)
(recur (inc x)))
(async/close! in))))
(async/<!! (go (loop [result []]
(<! (async/timeout 100)) ; simulate slow reader
(if-let [v (<! out)]
(recur (conj result v))
result)))))
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment