Skip to content

Instantly share code, notes, and snippets.

@dazld
Last active Oct 14, 2018
Embed
What would you like to do?
accumulate values for processing in another thread
(ns batches.core
(:require [clojure.core.async :as a]))
(defprotocol Accumulating
(add [this v])
(stop [this]))
(defn accumulate
"Periodically invoke action with values, which can be conj'd onto the store with add. Allows the invocation to be cancelled´´
by calling (stop (accumulate identity))"
[action time-ms]
(let [stop (a/chan 1)
push (a/chan)
timer (a/chan)
_ (a/go-loop []
(a/<! (a/timeout time-ms))
(a/>! timer :go)
(recur))]
(a/go-loop [vals []]
(let [[v ch] (a/alts! [push timer stop] :priority true)]
(condp = ch
push (recur (conj vals v))
timer (do
(action vals)
(recur []))
stop vals)))
(reify
Accumulating
(add [_ v] (a/put! push v))
(stop [_] (a/put! stop :stop)))))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment