Skip to content

Instantly share code, notes, and snippets.

@tcoupland
Last active August 29, 2015 14:10
Show Gist options
  • Save tcoupland/83c47d8ce2f78570a54c to your computer and use it in GitHub Desktop.
Save tcoupland/83c47d8ce2f78570a54c to your computer and use it in GitHub Desktop.
A channel modifier that creates batches of events either when a given number of events have been put into it or when a given time period has expired.
(defn partition-or-time
"Returns a channel that will either contain vectors of n items taken from ch or
if beat-rate millis elapses then a vector with the available items. The
final vector in the return channel may be smaller than n if ch closed before
the vector could be completely filled."
[n ch beat-rate buf-or-n]
(let [out (chan buf-or-n)]
(go (loop [arr (make-array Object n)
idx 0
beat (timeout beat-rate)]
(let [[v c] (alts! [ch beat])]
(if (= c beat)
(do
(if (> idx 0)
(do (>! out (vec (take idx arr)))
(recur (make-array Object n)
0
(timeout beat-rate)))
(recur arr idx (timeout beat-rate))))
(if-not (nil? v)
(do (aset ^objects arr idx v)
(let [new-idx (inc idx)]
(if (< new-idx n)
(recur arr new-idx beat)
(do (>! out (vec arr))
(recur (make-array Object n) 0 (timeout beat-rate))))))
(do (when (> idx 0)
(let [narray (make-array Object idx)]
(System/arraycopy arr 0 narray 0 idx)
(>! out (vec narray))))
(close! out)))))))
out))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment