Skip to content

Instantly share code, notes, and snippets.

@bmabey
Last active December 25, 2015 02:39
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save bmabey/6904565 to your computer and use it in GitHub Desktop.
Save bmabey/6904565 to your computer and use it in GitHub Desktop.
(ns rbl.utils.async
(:require [clojure.core.async :refer :all]
[clojure.tools.macro :refer [macrolet]]))
(defn batch-until-pause
"Batches values from the `in` channel into a single a vec of values up until no
messages have appeared in `timeout-pause` milliseconds. `max-batch-time` is the
number of milliseconds to wait overall before sending a batch."
([timeout-pause max-batch-time in]
(batch-until-pause timeout-pause max-batch-time in (chan)))
([timeout-pause max-batch-time in out]
(go
(loop [batch []
batch-timeout (timeout max-batch-time)]
(macrolet [(send-batch! [] '(when (seq batch) (>! out batch)))]
(let [[val port] (alts! [in (timeout timeout-pause) batch-timeout])]
(cond
;; batch timed out, send what we have now and start a new batch timeout
(= port batch-timeout) (do
(send-batch!)
(recur [] (timeout max-batch-time)))
;; we got a value, add it to the batch
(and (= port in) val) (recur (conj batch val) batch-timeout)
;; the in channel was closed, send our batch
(= port in) (send-batch!)
;; a timeout happened, send the batch and start a new one
:else (do
(send-batch!)
(recur [] batch-timeout))))))
(close! out))
out))
(require 'midje.sweet)
(defn chan->seq
"Drains a channel into a seq."
[channel]
(go
(loop [coll []]
(if-let [val (<! channel)]
(recur (conj coll val))
coll))))
(defn eager-chan->seq
"Drains a channel into a seq in a blocking fashion on current thread."
[channel]
(<!! (chan->seq channel)))
(defn coll->chan
"Realizes the entire coll/seq and puttting the values on the provided channel"
([coll chan]
(go
(doseq [val coll]
(>! chan val)))))
(facts "#'batch-until-pause"
(let [generator (fn [channel batches pause]
(go
(doseq [batch batches]
(<! (coll->chan batch channel))
(<! (timeout pause)))
(close! channel)))
msg-channel (chan 5)
batches [[{:a 1} {:b 2}]
[{:c 3} {:d 4} {:e 5}]
[{:f 6}]]]
(generator msg-channel batches 50) => anything
(->> msg-channel
(batch-until-pause 30 100)
eager-chan->seq) => batches)
;; this passes but is too brittle with the timeouts..
;; TODO: redef the timeout fn with a virtual timer...
#_(fact "it cuts off a batch at max-batch-time"
(let [generator (fn [channel msgs pause]
(go
(<! (coll->chan-whith-pause msgs channel pause))
(close! channel)))
msg-channel (chan 5)
msgs [{:a 1} {:b 2} {:c 3} {:d 4} {:e 5} {:f 6}]]
(generator msg-channel msgs 5) => anything
(->> msg-channel
(batch-until-pause 30 17)
eager-chan->seq) => [[{:a 1} {:b 2}]
[{:c 3} {:d 4} ]
[{:e 5} {:f 6}]])))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment