Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
;; use spec to define stateful protocols inspired by UBF(b)
;; https://ubf.github.io/ubf/ubf-user-guide.en.html
(require '[clojure.spec.alpha :as s]
'[clojure.core.async :as async])
(defn ubfish
"Takes a protocol definition and four channels. Checks for protocol
violations while copying from-server to-client and from-client
to-server."
[ubspec from-client to-client from-server to-server]
(let [ubspec (into {}
(for [[state-name {:keys [input output events]}] ubspec
:when (not= state-name :ANY)]
[state-name {:input (s/or
:input (or input (constantly false))
:any-input (or (:input (:ANY ubspec))
(constantly false)))
:output (into {} (for [[case-name spec] (merge {} output (:output (:ANY ubspec)))]
[case-name (s/or
:output spec
:event (or events (constantly false))
:any-event (or (:events (:ANY ubspec))
(constantly false)))]))
:events (s/or
:event (or events (constantly false))
:any-event (:events (:ANY ubspec) (constantly false)))}]))]
(async/go-loop [s* (get ubspec :start)]
(async/alt!
from-client ([msg]
(if (some? msg)
(let [conformed-value (s/conform (:input s*) msg)]
(if (s/invalid? conformed-value)
(do
(let [explanation (s/explain-data (:input s*) msg)]
(async/>! to-server (assoc explanation
:cognitect.anomalies/anomaly
:cognitect.anomalies/incorrect))
(async/>! to-client (assoc explanation
:cognitect.anomalies/anomaly
:cognitect.anomalies/incorrect)))
(async/close! from-server)
(async/close! to-server)
(async/close! to-client)
(async/close! from-client))
(let [[input-tag [case-name _]] conformed-value]
(let [output-spec (get-in s* [:output case-name])
_ (async/>! to-server msg)
new-state (loop []
(let [maybe-response (async/<! from-server)
conformed-value (s/conform output-spec maybe-response)]
(if (s/invalid? conformed-value)
(do
(let [explanation (s/explain-data output-spec maybe-response)]
(async/>! to-server (assoc explanation
:cognitect.anomalies/anomaly
:cognitect.anomalies/fault))
(async/>! to-client (assoc explanation
:cognitect.anomalies/anomaly
:cognitect.anomalies/fault)))
(async/close! from-server)
(async/close! to-server)
(async/close! to-client)
(async/close! from-client)
false)
(let [[tag value] conformed-value]
(case tag
:output (let [[new-state _] value]
(async/>! to-client maybe-response)
(case input-tag
:input (get ubspec new-state)
:any-input s*))
(:event :any-event) (do
(async/>! to-client maybe-response)
(recur)))))))]
(when new-state
(recur new-state))))))
(async/close! to-server)))
from-server ([msg]
(if (some? msg)
(if (s/valid? (:events s*) msg)
(do
(async/>! to-client msg)
(recur s*))
(do
(let [explanation (s/explain-data (:events s*) msg)]
(async/>! to-server (assoc explanation
:cognitect.anomalies/anomaly
:cognitect.anomalies/fault))
(async/>! to-client (assoc explanation
:cognitect.anomalies/anomaly
:cognitect.anomalies/fault)))
(async/close! from-server)
(async/close! to-server)
(async/close! to-client)
(async/close! from-client)))
(async/close! to-client)))))))
;; example chat server spec and server
(s/def ::logon #{:logon})
(s/def ::proceed (s/tuple #{:ok} ::nick))
(s/def ::list-groups #{:groups})
(s/def ::join-group (s/tuple #{:join} ::group))
(s/def ::leave-group (s/tuple #{:leave} ::group))
(s/def ::change-nick (s/tuple #{:nick} ::nick))
(s/def ::msg (s/tuple #{:msg} ::group string?))
(s/def ::group string?)
(s/def ::groups (s/coll-of ::group))
(s/def ::nick string?)
(s/def ::ok #{:ok})
(s/def ::msg-event (s/tuple #{:msg} ::nick string? ::group))
(s/def ::join-event (s/tuple #{:joins} ::nick ::group))
(s/def ::leave-event (s/tuple #{:leaves} ::nick ::group))
(s/def ::change-nick-event (s/tuple #{:changes-name} ::nick ::nick ::group))
(s/def ::info #{:info})
(s/def ::description #{:description})
(s/def ::contract #{:contract})
(def protocol
{;; always starts in the start state
:start {:input (s/or :logon ::logon)
:output {:logon (s/or :active ::proceed)}}
:active {:input (s/or
:list-groups ::list-groups
:join-group ::join-group
:leave-group ::leave-group
:change-nick ::change-nick
:msg ::msg)
:output {:list-groups (s/or :active ::groups)
:join-group (s/or :active ::ok)
:leave-group (s/or :active ::ok)
:change-nick (s/or :active boolean?)
:msg (s/or :active boolean?)}
:events (s/or
:msg-event ::msg-event
:join-event ::join-event
:leave-event ::leave-event
:change-nick-event ::change-nick-event)}
;; Any is a special state that is merged into all the other
;; states. No state transitions
:ANY {:input (s/or
:info ::info
:description ::description
:contract ::contract)
:output {:info string?
:description string?
:contract any?}}})
(require '[clojure.edn :as edn]
'[clojure.java.io :as io])
(let [broadcast (async/chan)
broadcast-p (async/pub broadcast last)
groups (atom #{})]
(with-open [sock (java.net.ServerSocket. 3456)]
(loop []
(let [to-client (async/chan)
from-client (async/chan)
to-server (async/chan)
from-server (async/chan)
_ (ubfish protocol from-client to-client from-server to-server)
client (.accept sock)
in (.getInputStream client)
out (.getOutputStream client)
c (async/go
(loop [nick (str (gensym))
part-of-groups #{}]
(async/alt!
to-server ([form]
(cond (or (= form ::server-violation)
(= form ::client-violation))
nil
(= form :info)
(do
(async/>! from-server "Some Info")
(recur nick part-of-groups))
(= form :description)
(do
(async/>! from-server "Some Chat Server")
(recur nick part-of-groups))
(= form :contract)
(assert nil)
(= form :logon)
(do
(async/>! from-server [:ok nick])
(recur nick part-of-groups))
(= form :groups)
(do
(async/>! from-server @groups)
(recur nick part-of-groups))
(= (first form) :join)
(let [[_ group] form]
(if (contains? part-of-groups group)
(do
(async/>! from-server :ok)
(recur nick part-of-groups))
(do
(async/sub broadcast-p group from-server)
(swap! groups conj group)
(async/>! broadcast [:joins nick group])
(async/>! from-server :ok)
(recur nick (conj part-of-groups group)))))
(= (first form) :leave)
(let [[_ group] form]
(if (contains? part-of-groups group)
(do
(async/>! from-server :ok)
(recur nick part-of-groups))
(do
(async/>! broadcast [:leaves nick group])
(async/>! from-server :ok)
(async/unsub broadcast-p group from-server)
(recur nick (disj part-of-groups group)))))
(= (first form) :change-nick)
(let [[_ new-nick] form]
(doseq [group part-of-groups]
(async/>! broadcast [:change-nick nick new-nick group]))
(async/>! from-server true)
(recur new-nick
part-of-groups))
(= (first form) :msg)
(let [[_ group text] form]
(if (contains? part-of-groups group)
(do
(async/>! from-server (boolean (async/>! broadcast [:msg nick text group])))
(recur nick part-of-groups))
(do
(async/>! from-server false)
(recur nick part-of-groups))))
:else
(assert false))))))
a (async/go
(with-open [rdr (-> in io/reader clojure.lang.LineNumberingPushbackReader.)]
(loop []
(let [form (async/<! (async/thread (read rdr)))]
(when (some? form)
(when (async/>! from-client form)
(recur)))))))
b (async/go
(with-open [wrt (-> out java.io.PrintWriter.)]
(loop []
(let [form (async/<! to-client)]
(when (some? form)
(when (some? (async/<!
(async/thread
(binding [*out* wrt]
(prn form))
true)))
(recur)))))))]
(async/go
(async/<! a)
(async/<! b)
(async/close! to-client)
(async/close! from-client)
(async/close! to-server)
(async/close! from-server)
(.close in)
(.close out)
(.close client)))
(recur))))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.