Skip to content

Instantly share code, notes, and snippets.

@dball
Created August 2, 2014 15:19
Show Gist options
  • Save dball/2cebfd9e7a815f813eff to your computer and use it in GitHub Desktop.
Save dball/2cebfd9e7a815f813eff to your computer and use it in GitHub Desktop.
core.async flow limiter
(ns dball.util.async
(:require [clojure.core.async :refer [go-loop >! <! timeout close! chan dropping-buffer >!! alts!]]
[clojure.tools.logging :as log])
(:import [java.util.concurrent Executors ScheduledExecutorService TimeUnit]))
(defn ^ScheduledExecutorService scheduler
[]
(Executors/newSingleThreadScheduledExecutor))
(defn ticker-chan
[scheduler period-ms]
(let [ticks (chan (dropping-buffer 1))
task-fn (atom nil)
tick-fn #(when-not (>!! ticks :tick)
(.cancel @task-fn))]
(reset! task-fn (.scheduleAtFixedRate scheduler tick-fn period-ms period-ms
TimeUnit/MILLISECONDS))
ticks))
(defn flow-limiter
([from to min-period-ms]
(flow-limiter from to min-period-ms true))
([from to min-period-ms close?]
(let [scheduler (scheduler)]
(go-loop [state :ready
ticks nil]
(condp = state
:ready
(if-some [value (<! from)]
(do
(>! to value)
(recur :pause (ticker-chan scheduler min-period-ms)))
(recur :shutdown nil))
:pause
(let [[value task] (alts! [from ticks] :priority true)]
(condp = task
from
(if (some? value)
(do
(<! ticks)
(>! to value)
(recur :pause ticks))
(do
(close! ticks)
(recur :shutdown nil)))
ticks
(do
(close! ticks)
(recur :ready nil))))
:shutdown
(do
(when close? (close! to))
(.shutdown scheduler)))))))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment