Last active
August 29, 2015 14:04
-
-
Save ghadishayban/7018dbc353216801b318 to your computer and use it in GitHub Desktop.
"turnstile" for flow-control
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(ns flow-control | |
(:require [clojure.core.async :as async :refer :all | |
:exclude (map merge partition-by partition unique take into reduce split)])) | |
;; flow control through a "turnstile" | |
;; kind of like an extensible circuit breaker | |
;; Overview -- | |
;; Your program's processes are like people entering a subway, | |
;; but the central authority only allows certain tickets in. | |
;; Mechanism -- | |
;; Interested parties register a channel with a set of attributes | |
;; representing needed resources, like #{:queue :mta :database}. | |
;; Each process tries to push the "turnstile" (channel) by putting. | |
;; The put proceeds only iff every one of the process's registered attrs | |
;; are permitted | |
;; Usage at the bottom | |
(defprotocol FlowControl | |
(register [_ ch ks] "register a channel with needed attrs") | |
(allow [_ attr]) | |
(block [_ attr]) | |
(terminate [_])) | |
(defn ^:private monitor | |
"runs the turnstile. continuously takes from channels that | |
have a subset of the permitted attrs." | |
[chs permits cancel changed] | |
(let [pick (fn [attrs chs] | |
(reduce-kv | |
(fn [ret c v] | |
(if (every? attrs v) | |
(conj ret c) | |
ret)) | |
[] chs)) | |
calc-reads #(conj (pick @permits @chs) | |
changed | |
cancel)] | |
(go-loop [] | |
(let [[_ ch] (alts! (calc-reads))] | |
(when-not (= ch cancel) (recur)))))) | |
(defn turnstile | |
[] | |
(let [chs (atom {}) ;; ch -> set of attrs | |
permits (atom #{}) ;; allowed attrs | |
cancel (chan) | |
changed (chan (sliding-buffer 1))] | |
(monitor chs permits cancel changed) | |
(reify | |
FlowControl | |
(register [_ ch korks] | |
(let [ks (if (keyword? korks) | |
#{korks} | |
(set korks))] | |
(swap! chs assoc ch ks)) | |
(put! changed true)) | |
(allow [_ attr] | |
(let [ps (swap! permits conj attr)] | |
(put! changed true) | |
ps)) | |
(block [_ attr] | |
(let [ps (swap! permits disj attr)] | |
(put! changed true) | |
ps)) | |
(terminate [_] (close! cancel))))) | |
;;;;;; usage sample | |
(defn printing-process | |
[barrier tag] | |
(thread | |
(while true | |
(>!! barrier :dummy) | |
(println tag) | |
(Thread/sleep (rand-int 2000))))) | |
(defn launch-procs | |
[t] | |
(let [procs [:queue "I need a queue" | |
:database "I only need a DB" | |
#{:database :mta} "I need a DB *and* the MTAs"]] | |
(doseq [[attrs proc-label] (partition 2 procs) :let [c (chan)]] | |
(register t c attrs) | |
(printing-process c proc-label)))) | |
(comment | |
(def t (turnstile)) | |
(launch-procs t) ;; nothing | |
(allow t :mta) ;; still nothing | |
(allow t :database) ;; process 2 & 3 | |
(allow t :queue) ;; all | |
(block t :mta) ;; 1 & 2 | |
(block t :queue) ;; 2 | |
(block t :database)) ;; none | |
(defmethod print-method | |
clojure.core.async.impl.channels.ManyToManyChannel | |
[_ ^java.io.Writer w] | |
(.write w "<!>")) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment