Skip to content

Instantly share code, notes, and snippets.

@gerritjvv
Last active January 1, 2016 15:29
Show Gist options
  • Star 3 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save gerritjvv/8164850 to your computer and use it in GitHub Desktop.
Save gerritjvv/8164850 to your computer and use it in GitHub Desktop.
There are many times where you need to buffer up a series off results and then perform an operation on them, and if the count is not reached on a predefined timeout do the operation with the results collected. For my use case I'm writing a kafka producer and want messages received on the a send function to be buffered before sending to kafka. Th…
(require '[clojure.core.async :refer [go alts! >! <! >!! <!! chan timeout]])
(defn buffered-chan
"Reads from ch-source and if either timeout or the buffer-count has been
read the result it sent to the channel thats returned from this function"
([ch-source buffer-count timeout-ms]
(buffered-chan ch-source buffer-count timeout-ms 1))
([ch-source buffer-count timeout-ms buffer-or-n]
(let [ch-target (chan buffer-or-n)]
(go
(loop [buff [] t (timeout timeout-ms)]
(let [[v _] (alts! [ch-source t])
b (if v (conj buff v) buff)]
(if (or (>= (count b) buffer-count) (not v))
(do
(if (>= (count b) 0)
(>! ch-target b)) ;send the buffer to the channel
(recur [] (timeout timeout-ms))) ;create a new buffer and new timeout
(recur b t))))) ;pass the new buffer and the current timeout
ch-target)))
;; test
(let [ch-source (chan)
buff-ch (buffered-chan ch-source 10 50000 11)]
(go
(dotimes [i 100]
(>! ch-source i)))
(dotimes [i 10]
(let [v (<!! buff-ch)]
(prn "got " v)))
)
;; "got " [0 1 2 3 4 5 6 7 8 9]
;; "got " [10 11 12 13 14 15 16 17 18 19]
;; "got " [20 21 22 23 24 25 26 27 28 29]
;; "got " [30 31 32 33 34 35 36 37 38 39]
;; "got " [40 41 42 43 44 45 46 47 48 49]
;; "got " [50 51 52 53 54 55 56 57 58 59]
;; "got " [60 61 62 63 64 65 66 67 68 69]
;; "got " [70 71 72 73 74 75 76 77 78 79]
;; "got " [80 81 82 83 84 85 86 87 88 89]
;; "got " [90 91 92 93 94 95 96 97 98 99]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment