Skip to content

Instantly share code, notes, and snippets.

@hiredman
Created March 27, 2020 21:53
Show Gist options
  • Star 4 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save hiredman/60e5e6f0025cb38c8a7739d21f5a8ce6 to your computer and use it in GitHub Desktop.
Save hiredman/60e5e6f0025cb38c8a7739d21f5a8ce6 to your computer and use it in GitHub Desktop.
;; use spec to define stateful protocols inspired by UBF(b)
;; https://ubf.github.io/ubf/ubf-user-guide.en.html
(require '[clojure.spec.alpha :as s]
'[clojure.core.async :as async])
(defn ubfish
"Takes a protocol definition and four channels. Checks for protocol
violations while copying from-server to-client and from-client
to-server."
[ubspec from-client to-client from-server to-server]
(let [ubspec (into {}
(for [[state-name {:keys [input output events]}] ubspec
:when (not= state-name :ANY)]
[state-name {:input (s/or
:input (or input (constantly false))
:any-input (or (:input (:ANY ubspec))
(constantly false)))
:output (into {} (for [[case-name spec] (merge {} output (:output (:ANY ubspec)))]
[case-name (s/or
:output spec
:event (or events (constantly false))
:any-event (or (:events (:ANY ubspec))
(constantly false)))]))
:events (s/or
:event (or events (constantly false))
:any-event (:events (:ANY ubspec) (constantly false)))}]))]
(async/go-loop [s* (get ubspec :start)]
(async/alt!
from-client ([msg]
(if (some? msg)
(let [conformed-value (s/conform (:input s*) msg)]
(if (s/invalid? conformed-value)
(do
(let [explanation (s/explain-data (:input s*) msg)]
(async/>! to-server (assoc explanation
:cognitect.anomalies/anomaly
:cognitect.anomalies/incorrect))
(async/>! to-client (assoc explanation
:cognitect.anomalies/anomaly
:cognitect.anomalies/incorrect)))
(async/close! from-server)
(async/close! to-server)
(async/close! to-client)
(async/close! from-client))
(let [[input-tag [case-name _]] conformed-value]
(let [output-spec (get-in s* [:output case-name])
_ (async/>! to-server msg)
new-state (loop []
(let [maybe-response (async/<! from-server)
conformed-value (s/conform output-spec maybe-response)]
(if (s/invalid? conformed-value)
(do
(let [explanation (s/explain-data output-spec maybe-response)]
(async/>! to-server (assoc explanation
:cognitect.anomalies/anomaly
:cognitect.anomalies/fault))
(async/>! to-client (assoc explanation
:cognitect.anomalies/anomaly
:cognitect.anomalies/fault)))
(async/close! from-server)
(async/close! to-server)
(async/close! to-client)
(async/close! from-client)
false)
(let [[tag value] conformed-value]
(case tag
:output (let [[new-state _] value]
(async/>! to-client maybe-response)
(case input-tag
:input (get ubspec new-state)
:any-input s*))
(:event :any-event) (do
(async/>! to-client maybe-response)
(recur)))))))]
(when new-state
(recur new-state))))))
(async/close! to-server)))
from-server ([msg]
(if (some? msg)
(if (s/valid? (:events s*) msg)
(do
(async/>! to-client msg)
(recur s*))
(do
(let [explanation (s/explain-data (:events s*) msg)]
(async/>! to-server (assoc explanation
:cognitect.anomalies/anomaly
:cognitect.anomalies/fault))
(async/>! to-client (assoc explanation
:cognitect.anomalies/anomaly
:cognitect.anomalies/fault)))
(async/close! from-server)
(async/close! to-server)
(async/close! to-client)
(async/close! from-client)))
(async/close! to-client)))))))
;; example chat server spec and server
(s/def ::logon #{:logon})
(s/def ::proceed (s/tuple #{:ok} ::nick))
(s/def ::list-groups #{:groups})
(s/def ::join-group (s/tuple #{:join} ::group))
(s/def ::leave-group (s/tuple #{:leave} ::group))
(s/def ::change-nick (s/tuple #{:nick} ::nick))
(s/def ::msg (s/tuple #{:msg} ::group string?))
(s/def ::group string?)
(s/def ::groups (s/coll-of ::group))
(s/def ::nick string?)
(s/def ::ok #{:ok})
(s/def ::msg-event (s/tuple #{:msg} ::nick string? ::group))
(s/def ::join-event (s/tuple #{:joins} ::nick ::group))
(s/def ::leave-event (s/tuple #{:leaves} ::nick ::group))
(s/def ::change-nick-event (s/tuple #{:changes-name} ::nick ::nick ::group))
(s/def ::info #{:info})
(s/def ::description #{:description})
(s/def ::contract #{:contract})
(def protocol
{;; always starts in the start state
:start {:input (s/or :logon ::logon)
:output {:logon (s/or :active ::proceed)}}
:active {:input (s/or
:list-groups ::list-groups
:join-group ::join-group
:leave-group ::leave-group
:change-nick ::change-nick
:msg ::msg)
:output {:list-groups (s/or :active ::groups)
:join-group (s/or :active ::ok)
:leave-group (s/or :active ::ok)
:change-nick (s/or :active boolean?)
:msg (s/or :active boolean?)}
:events (s/or
:msg-event ::msg-event
:join-event ::join-event
:leave-event ::leave-event
:change-nick-event ::change-nick-event)}
;; Any is a special state that is merged into all the other
;; states. No state transitions
:ANY {:input (s/or
:info ::info
:description ::description
:contract ::contract)
:output {:info string?
:description string?
:contract any?}}})
(require '[clojure.edn :as edn]
'[clojure.java.io :as io])
(let [broadcast (async/chan)
broadcast-p (async/pub broadcast last)
groups (atom #{})]
(with-open [sock (java.net.ServerSocket. 3456)]
(loop []
(let [to-client (async/chan)
from-client (async/chan)
to-server (async/chan)
from-server (async/chan)
_ (ubfish protocol from-client to-client from-server to-server)
client (.accept sock)
in (.getInputStream client)
out (.getOutputStream client)
c (async/go
(loop [nick (str (gensym))
part-of-groups #{}]
(async/alt!
to-server ([form]
(cond (or (= form ::server-violation)
(= form ::client-violation))
nil
(= form :info)
(do
(async/>! from-server "Some Info")
(recur nick part-of-groups))
(= form :description)
(do
(async/>! from-server "Some Chat Server")
(recur nick part-of-groups))
(= form :contract)
(assert nil)
(= form :logon)
(do
(async/>! from-server [:ok nick])
(recur nick part-of-groups))
(= form :groups)
(do
(async/>! from-server @groups)
(recur nick part-of-groups))
(= (first form) :join)
(let [[_ group] form]
(if (contains? part-of-groups group)
(do
(async/>! from-server :ok)
(recur nick part-of-groups))
(do
(async/sub broadcast-p group from-server)
(swap! groups conj group)
(async/>! broadcast [:joins nick group])
(async/>! from-server :ok)
(recur nick (conj part-of-groups group)))))
(= (first form) :leave)
(let [[_ group] form]
(if (contains? part-of-groups group)
(do
(async/>! from-server :ok)
(recur nick part-of-groups))
(do
(async/>! broadcast [:leaves nick group])
(async/>! from-server :ok)
(async/unsub broadcast-p group from-server)
(recur nick (disj part-of-groups group)))))
(= (first form) :change-nick)
(let [[_ new-nick] form]
(doseq [group part-of-groups]
(async/>! broadcast [:change-nick nick new-nick group]))
(async/>! from-server true)
(recur new-nick
part-of-groups))
(= (first form) :msg)
(let [[_ group text] form]
(if (contains? part-of-groups group)
(do
(async/>! from-server (boolean (async/>! broadcast [:msg nick text group])))
(recur nick part-of-groups))
(do
(async/>! from-server false)
(recur nick part-of-groups))))
:else
(assert false))))))
a (async/go
(with-open [rdr (-> in io/reader clojure.lang.LineNumberingPushbackReader.)]
(loop []
(let [form (async/<! (async/thread (read rdr)))]
(when (some? form)
(when (async/>! from-client form)
(recur)))))))
b (async/go
(with-open [wrt (-> out java.io.PrintWriter.)]
(loop []
(let [form (async/<! to-client)]
(when (some? form)
(when (some? (async/<!
(async/thread
(binding [*out* wrt]
(prn form))
true)))
(recur)))))))]
(async/go
(async/<! a)
(async/<! b)
(async/close! to-client)
(async/close! from-client)
(async/close! to-server)
(async/close! from-server)
(.close in)
(.close out)
(.close client)))
(recur))))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment