Skip to content

Instantly share code, notes, and snippets.

@dazld
Last active October 14, 2018 13:51
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save dazld/424fe19380f1a1227e0024af319b758d to your computer and use it in GitHub Desktop.
Save dazld/424fe19380f1a1227e0024af319b758d to your computer and use it in GitHub Desktop.
accumulate values for processing in another thread
(ns batches.core
(:require [clojure.core.async :as a]))
(defprotocol Accumulating
(add [this v])
(stop [this]))
(defn accumulate
"Periodically invoke action with values, which can be conj'd onto the store with add. Allows the invocation to be cancelled´´
by calling (stop (accumulate identity))"
[action time-ms]
(let [stop (a/chan 1)
push (a/chan)
timer (a/chan)
_ (a/go-loop []
(a/<! (a/timeout time-ms))
(a/>! timer :go)
(recur))]
(a/go-loop [vals []]
(let [[v ch] (a/alts! [push timer stop] :priority true)]
(condp = ch
push (recur (conj vals v))
timer (do
(action vals)
(recur []))
stop vals)))
(reify
Accumulating
(add [_ v] (a/put! push v))
(stop [_] (a/put! stop :stop)))))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment