Skip to content

Instantly share code, notes, and snippets.

@noisesmith
Last active February 7, 2017 01:42
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save noisesmith/abec1d9a612c5a61bde8c7fff4ba8293 to your computer and use it in GitHub Desktop.
Save noisesmith/abec1d9a612c5a61bde8c7fff4ba8293 to your computer and use it in GitHub Desktop.
using core.async to combine data into chunks with a timeout
(ns noisesmith.chunker
(:require [clojure.core.async :refer [alts! <! >! timeout go-loop]]))
(defn launch-chunks
[chunk-size time-out rq-chan result-chan]
(go-loop
[acc []]
(let [[res c] (alts! [(timeout time-out)
rq-chan])
fresh (= c rq-chan)
timed-out (and (not fresh) (seq acc))
acc (if fresh
(conj acc res)
acc)]
(recur
(if (or (> (count acc) chunk-size)
timed-out)
(do (>! result-chan acc)
[])
acc)))))
@noisesmith
Copy link
Author

noisesmith commented Feb 7, 2017

repl session:

kingfisher.core=> (require '[clojure.core.async :as >])
nil
kingfisher.core=> (def chan-a (>/chan))
#'kingfisher.core/chan-a
kingfisher.core=> (def chan-b (>/chan))
#'kingfisher.core/chan-b
kingfisher.core=> (>/go-loop [] (let [x (>/<! chan-b)] (println (str "read " x))) (recur))
#object[clojure.core.async.impl.channels.ManyToManyChannel 0x6a6203db "clojure.core.async.impl.channels.ManyToManyChannel@6a6203db"]
kingfisher.core=> (>/>!! chan-b :foo)
read :foo
true
kingfisher.core=> (load-file "/tmp/chunker.clj")
#'noisesmith.chunker/launch-chunks
kingfisher.core=> (noisesmith.chunker/launch-chunks 3 10000 chan-a chan-b)
#object[clojure.core.async.impl.channels.ManyToManyChannel 0x4f3b5399 "clojure.core.async.impl.channels.ManyToManyChannel@4f3b5399"]
kingfisher.core=> (>/>!! chan-a 1)
true
kingfisher.core=> read [1]


kingfisher.core=> (dotimes [i 10] (>/>!! chan-a i))
read [0 1 2 3]
read [4 5 6 7]
nil
kingfisher.core=> read [8 9]

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment