Last active
December 25, 2015 02:39
-
-
Save bmabey/6904565 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(ns rbl.utils.async | |
(:require [clojure.core.async :refer :all] | |
[clojure.tools.macro :refer [macrolet]])) | |
(defn batch-until-pause | |
"Batches values from the `in` channel into a single a vec of values up until no | |
messages have appeared in `timeout-pause` milliseconds. `max-batch-time` is the | |
number of milliseconds to wait overall before sending a batch." | |
([timeout-pause max-batch-time in] | |
(batch-until-pause timeout-pause max-batch-time in (chan))) | |
([timeout-pause max-batch-time in out] | |
(go | |
(loop [batch [] | |
batch-timeout (timeout max-batch-time)] | |
(macrolet [(send-batch! [] '(when (seq batch) (>! out batch)))] | |
(let [[val port] (alts! [in (timeout timeout-pause) batch-timeout])] | |
(cond | |
;; batch timed out, send what we have now and start a new batch timeout | |
(= port batch-timeout) (do | |
(send-batch!) | |
(recur [] (timeout max-batch-time))) | |
;; we got a value, add it to the batch | |
(and (= port in) val) (recur (conj batch val) batch-timeout) | |
;; the in channel was closed, send our batch | |
(= port in) (send-batch!) | |
;; a timeout happened, send the batch and start a new one | |
:else (do | |
(send-batch!) | |
(recur [] batch-timeout)))))) | |
(close! out)) | |
out)) | |
(require 'midje.sweet) | |
(defn chan->seq | |
"Drains a channel into a seq." | |
[channel] | |
(go | |
(loop [coll []] | |
(if-let [val (<! channel)] | |
(recur (conj coll val)) | |
coll)))) | |
(defn eager-chan->seq | |
"Drains a channel into a seq in a blocking fashion on current thread." | |
[channel] | |
(<!! (chan->seq channel))) | |
(defn coll->chan | |
"Realizes the entire coll/seq and puttting the values on the provided channel" | |
([coll chan] | |
(go | |
(doseq [val coll] | |
(>! chan val))))) | |
(facts "#'batch-until-pause" | |
(let [generator (fn [channel batches pause] | |
(go | |
(doseq [batch batches] | |
(<! (coll->chan batch channel)) | |
(<! (timeout pause))) | |
(close! channel))) | |
msg-channel (chan 5) | |
batches [[{:a 1} {:b 2}] | |
[{:c 3} {:d 4} {:e 5}] | |
[{:f 6}]]] | |
(generator msg-channel batches 50) => anything | |
(->> msg-channel | |
(batch-until-pause 30 100) | |
eager-chan->seq) => batches) | |
;; this passes but is too brittle with the timeouts.. | |
;; TODO: redef the timeout fn with a virtual timer... | |
#_(fact "it cuts off a batch at max-batch-time" | |
(let [generator (fn [channel msgs pause] | |
(go | |
(<! (coll->chan-whith-pause msgs channel pause)) | |
(close! channel))) | |
msg-channel (chan 5) | |
msgs [{:a 1} {:b 2} {:c 3} {:d 4} {:e 5} {:f 6}]] | |
(generator msg-channel msgs 5) => anything | |
(->> msg-channel | |
(batch-until-pause 30 17) | |
eager-chan->seq) => [[{:a 1} {:b 2}] | |
[{:c 3} {:d 4} ] | |
[{:e 5} {:f 6}]]))) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment