Skip to content

Instantly share code, notes, and snippets.

@ghadishayban
Last active August 29, 2015 14:04
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ghadishayban/7018dbc353216801b318 to your computer and use it in GitHub Desktop.
Save ghadishayban/7018dbc353216801b318 to your computer and use it in GitHub Desktop.
"turnstile" for flow-control
(ns flow-control
(:require [clojure.core.async :as async :refer :all
:exclude (map merge partition-by partition unique take into reduce split)]))
;; flow control through a "turnstile"
;; kind of like an extensible circuit breaker
;; Overview --
;; Your program's processes are like people entering a subway,
;; but the central authority only allows certain tickets in.
;; Mechanism --
;; Interested parties register a channel with a set of attributes
;; representing needed resources, like #{:queue :mta :database}.
;; Each process tries to push the "turnstile" (channel) by putting.
;; The put proceeds only iff every one of the process's registered attrs
;; are permitted
;; Usage at the bottom
(defprotocol FlowControl
(register [_ ch ks] "register a channel with needed attrs")
(allow [_ attr])
(block [_ attr])
(terminate [_]))
(defn ^:private monitor
"runs the turnstile. continuously takes from channels that
have a subset of the permitted attrs."
[chs permits cancel changed]
(let [pick (fn [attrs chs]
(reduce-kv
(fn [ret c v]
(if (every? attrs v)
(conj ret c)
ret))
[] chs))
calc-reads #(conj (pick @permits @chs)
changed
cancel)]
(go-loop []
(let [[_ ch] (alts! (calc-reads))]
(when-not (= ch cancel) (recur))))))
(defn turnstile
[]
(let [chs (atom {}) ;; ch -> set of attrs
permits (atom #{}) ;; allowed attrs
cancel (chan)
changed (chan (sliding-buffer 1))]
(monitor chs permits cancel changed)
(reify
FlowControl
(register [_ ch korks]
(let [ks (if (keyword? korks)
#{korks}
(set korks))]
(swap! chs assoc ch ks))
(put! changed true))
(allow [_ attr]
(let [ps (swap! permits conj attr)]
(put! changed true)
ps))
(block [_ attr]
(let [ps (swap! permits disj attr)]
(put! changed true)
ps))
(terminate [_] (close! cancel)))))
;;;;;; usage sample
(defn printing-process
[barrier tag]
(thread
(while true
(>!! barrier :dummy)
(println tag)
(Thread/sleep (rand-int 2000)))))
(defn launch-procs
[t]
(let [procs [:queue "I need a queue"
:database "I only need a DB"
#{:database :mta} "I need a DB *and* the MTAs"]]
(doseq [[attrs proc-label] (partition 2 procs) :let [c (chan)]]
(register t c attrs)
(printing-process c proc-label))))
(comment
(def t (turnstile))
(launch-procs t) ;; nothing
(allow t :mta) ;; still nothing
(allow t :database) ;; process 2 & 3
(allow t :queue) ;; all
(block t :mta) ;; 1 & 2
(block t :queue) ;; 2
(block t :database)) ;; none
(defmethod print-method
clojure.core.async.impl.channels.ManyToManyChannel
[_ ^java.io.Writer w]
(.write w "<!>"))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment