;; use spec to define stateful protocols inspired by UBF(b) | |
;; https://ubf.github.io/ubf/ubf-user-guide.en.html | |
(require '[clojure.spec.alpha :as s] | |
'[clojure.core.async :as async]) | |
(defn ubfish | |
"Takes a protocol definition and four channels. Checks for protocol | |
violations while copying from-server to-client and from-client | |
to-server." | |
[ubspec from-client to-client from-server to-server] | |
(let [ubspec (into {} | |
(for [[state-name {:keys [input output events]}] ubspec | |
:when (not= state-name :ANY)] | |
[state-name {:input (s/or | |
:input (or input (constantly false)) | |
:any-input (or (:input (:ANY ubspec)) | |
(constantly false))) | |
:output (into {} (for [[case-name spec] (merge {} output (:output (:ANY ubspec)))] | |
[case-name (s/or | |
:output spec | |
:event (or events (constantly false)) | |
:any-event (or (:events (:ANY ubspec)) | |
(constantly false)))])) | |
:events (s/or | |
:event (or events (constantly false)) | |
:any-event (:events (:ANY ubspec) (constantly false)))}]))] | |
(async/go-loop [s* (get ubspec :start)] | |
(async/alt! | |
from-client ([msg] | |
(if (some? msg) | |
(let [conformed-value (s/conform (:input s*) msg)] | |
(if (s/invalid? conformed-value) | |
(do | |
(let [explanation (s/explain-data (:input s*) msg)] | |
(async/>! to-server (assoc explanation | |
:cognitect.anomalies/anomaly | |
:cognitect.anomalies/incorrect)) | |
(async/>! to-client (assoc explanation | |
:cognitect.anomalies/anomaly | |
:cognitect.anomalies/incorrect))) | |
(async/close! from-server) | |
(async/close! to-server) | |
(async/close! to-client) | |
(async/close! from-client)) | |
(let [[input-tag [case-name _]] conformed-value] | |
(let [output-spec (get-in s* [:output case-name]) | |
_ (async/>! to-server msg) | |
new-state (loop [] | |
(let [maybe-response (async/<! from-server) | |
conformed-value (s/conform output-spec maybe-response)] | |
(if (s/invalid? conformed-value) | |
(do | |
(let [explanation (s/explain-data output-spec maybe-response)] | |
(async/>! to-server (assoc explanation | |
:cognitect.anomalies/anomaly | |
:cognitect.anomalies/fault)) | |
(async/>! to-client (assoc explanation | |
:cognitect.anomalies/anomaly | |
:cognitect.anomalies/fault))) | |
(async/close! from-server) | |
(async/close! to-server) | |
(async/close! to-client) | |
(async/close! from-client) | |
false) | |
(let [[tag value] conformed-value] | |
(case tag | |
:output (let [[new-state _] value] | |
(async/>! to-client maybe-response) | |
(case input-tag | |
:input (get ubspec new-state) | |
:any-input s*)) | |
(:event :any-event) (do | |
(async/>! to-client maybe-response) | |
(recur)))))))] | |
(when new-state | |
(recur new-state)))))) | |
(async/close! to-server))) | |
from-server ([msg] | |
(if (some? msg) | |
(if (s/valid? (:events s*) msg) | |
(do | |
(async/>! to-client msg) | |
(recur s*)) | |
(do | |
(let [explanation (s/explain-data (:events s*) msg)] | |
(async/>! to-server (assoc explanation | |
:cognitect.anomalies/anomaly | |
:cognitect.anomalies/fault)) | |
(async/>! to-client (assoc explanation | |
:cognitect.anomalies/anomaly | |
:cognitect.anomalies/fault))) | |
(async/close! from-server) | |
(async/close! to-server) | |
(async/close! to-client) | |
(async/close! from-client))) | |
(async/close! to-client))))))) | |
;; example chat server spec and server | |
(s/def ::logon #{:logon}) | |
(s/def ::proceed (s/tuple #{:ok} ::nick)) | |
(s/def ::list-groups #{:groups}) | |
(s/def ::join-group (s/tuple #{:join} ::group)) | |
(s/def ::leave-group (s/tuple #{:leave} ::group)) | |
(s/def ::change-nick (s/tuple #{:nick} ::nick)) | |
(s/def ::msg (s/tuple #{:msg} ::group string?)) | |
(s/def ::group string?) | |
(s/def ::groups (s/coll-of ::group)) | |
(s/def ::nick string?) | |
(s/def ::ok #{:ok}) | |
(s/def ::msg-event (s/tuple #{:msg} ::nick string? ::group)) | |
(s/def ::join-event (s/tuple #{:joins} ::nick ::group)) | |
(s/def ::leave-event (s/tuple #{:leaves} ::nick ::group)) | |
(s/def ::change-nick-event (s/tuple #{:changes-name} ::nick ::nick ::group)) | |
(s/def ::info #{:info}) | |
(s/def ::description #{:description}) | |
(s/def ::contract #{:contract}) | |
(def protocol | |
{;; always starts in the start state | |
:start {:input (s/or :logon ::logon) | |
:output {:logon (s/or :active ::proceed)}} | |
:active {:input (s/or | |
:list-groups ::list-groups | |
:join-group ::join-group | |
:leave-group ::leave-group | |
:change-nick ::change-nick | |
:msg ::msg) | |
:output {:list-groups (s/or :active ::groups) | |
:join-group (s/or :active ::ok) | |
:leave-group (s/or :active ::ok) | |
:change-nick (s/or :active boolean?) | |
:msg (s/or :active boolean?)} | |
:events (s/or | |
:msg-event ::msg-event | |
:join-event ::join-event | |
:leave-event ::leave-event | |
:change-nick-event ::change-nick-event)} | |
;; Any is a special state that is merged into all the other | |
;; states. No state transitions | |
:ANY {:input (s/or | |
:info ::info | |
:description ::description | |
:contract ::contract) | |
:output {:info string? | |
:description string? | |
:contract any?}}}) | |
(require '[clojure.edn :as edn] | |
'[clojure.java.io :as io]) | |
(let [broadcast (async/chan) | |
broadcast-p (async/pub broadcast last) | |
groups (atom #{})] | |
(with-open [sock (java.net.ServerSocket. 3456)] | |
(loop [] | |
(let [to-client (async/chan) | |
from-client (async/chan) | |
to-server (async/chan) | |
from-server (async/chan) | |
_ (ubfish protocol from-client to-client from-server to-server) | |
client (.accept sock) | |
in (.getInputStream client) | |
out (.getOutputStream client) | |
c (async/go | |
(loop [nick (str (gensym)) | |
part-of-groups #{}] | |
(async/alt! | |
to-server ([form] | |
(cond (or (= form ::server-violation) | |
(= form ::client-violation)) | |
nil | |
(= form :info) | |
(do | |
(async/>! from-server "Some Info") | |
(recur nick part-of-groups)) | |
(= form :description) | |
(do | |
(async/>! from-server "Some Chat Server") | |
(recur nick part-of-groups)) | |
(= form :contract) | |
(assert nil) | |
(= form :logon) | |
(do | |
(async/>! from-server [:ok nick]) | |
(recur nick part-of-groups)) | |
(= form :groups) | |
(do | |
(async/>! from-server @groups) | |
(recur nick part-of-groups)) | |
(= (first form) :join) | |
(let [[_ group] form] | |
(if (contains? part-of-groups group) | |
(do | |
(async/>! from-server :ok) | |
(recur nick part-of-groups)) | |
(do | |
(async/sub broadcast-p group from-server) | |
(swap! groups conj group) | |
(async/>! broadcast [:joins nick group]) | |
(async/>! from-server :ok) | |
(recur nick (conj part-of-groups group))))) | |
(= (first form) :leave) | |
(let [[_ group] form] | |
(if (contains? part-of-groups group) | |
(do | |
(async/>! from-server :ok) | |
(recur nick part-of-groups)) | |
(do | |
(async/>! broadcast [:leaves nick group]) | |
(async/>! from-server :ok) | |
(async/unsub broadcast-p group from-server) | |
(recur nick (disj part-of-groups group))))) | |
(= (first form) :change-nick) | |
(let [[_ new-nick] form] | |
(doseq [group part-of-groups] | |
(async/>! broadcast [:change-nick nick new-nick group])) | |
(async/>! from-server true) | |
(recur new-nick | |
part-of-groups)) | |
(= (first form) :msg) | |
(let [[_ group text] form] | |
(if (contains? part-of-groups group) | |
(do | |
(async/>! from-server (boolean (async/>! broadcast [:msg nick text group]))) | |
(recur nick part-of-groups)) | |
(do | |
(async/>! from-server false) | |
(recur nick part-of-groups)))) | |
:else | |
(assert false)))))) | |
a (async/go | |
(with-open [rdr (-> in io/reader clojure.lang.LineNumberingPushbackReader.)] | |
(loop [] | |
(let [form (async/<! (async/thread (read rdr)))] | |
(when (some? form) | |
(when (async/>! from-client form) | |
(recur))))))) | |
b (async/go | |
(with-open [wrt (-> out java.io.PrintWriter.)] | |
(loop [] | |
(let [form (async/<! to-client)] | |
(when (some? form) | |
(when (some? (async/<! | |
(async/thread | |
(binding [*out* wrt] | |
(prn form)) | |
true))) | |
(recur)))))))] | |
(async/go | |
(async/<! a) | |
(async/<! b) | |
(async/close! to-client) | |
(async/close! from-client) | |
(async/close! to-server) | |
(async/close! from-server) | |
(.close in) | |
(.close out) | |
(.close client))) | |
(recur)))) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment