Skip to content

Instantly share code, notes, and snippets.

@hiredman

hiredman/ws.clj

Created Jul 31, 2019
Embed
What would you like to do?
(import '(java.nio ByteBuffer)
'(java.net InetSocketAddress)
'(java.util.concurrent LinkedBlockingQueue)
'(java.nio.channels SelectionKey
SocketChannel
SelectableChannel
ServerSocketChannel
Selector)
'(java.io Closeable)
'(clojure.lang PersistentQueue)
'(java.util UUID)
'(java.util.logging Logger
Level
SimpleFormatter
ConsoleHandler
XMLFormatter))
(set! *warn-on-reflection* true)
(def m
{:init {(int \G) :a}
:a {(int \E) :b}
:b {(int \T) :c}
:c {(int \space) :d}
:d {(int \/) :e}
:e {(int \space) :f
(int \s) :u}
:f {(int \H) :g}
:g {(int \T) :h}
:h {(int \T) :i}
:i {(int \P) :j}
:j {(int \/) :k}
:k {(int \1) :l}
:l {(int \.) :m}
:m {(int \1) :n}
:n {(int \return) :o
(int \newline) :p}
:o {(int \newline) :p}
:p {:whatever :p
(int \return) :q
(int \newline) :r}
:q {:whatever :p
(int \newline) :r}
:r {:whatever :r
(int \return) :s
(int \newline) :t}
:s {:whatever :r
(int \newline) :t}
:t :index
:u {(int \space) :v}
:v {(int \H) :w}
:w {(int \T) :x}
:x {(int \T) :y}
:y {(int \P) :z}
:z {(int \/) :aa}
:aa {(int \1) :ab}
:ab {(int \.) :ac}
:ac {(int \1) :ad}
:ad {(int \return) :ae}
:ae {(int \newline) :af}
:af {(int \return) :ag
:whatever :ah}
:ag {(int \newline) :az}
:ah {:whatever :ah
(int \return) :ae}
:az :ws
})
(defonce ^Logger logger (doto (Logger/getLogger "ws-nodeps")
(.setUseParentHandlers false)
(.addHandler
(doto (ConsoleHandler.)
;; lol
(.setFormatter (XMLFormatter.))))))
(defmacro log [& data]
`(.log logger Level/INFO (str (pr-str ~@data))))
(defn encode-frame [frame]
(let [^bytes data (if (= :text (:op frame))
(.getBytes ^String (:text frame) "utf-8")
(:data frame))
size (count data)
first-byte (unchecked-byte
(bit-or (bit-shift-left (if (:fin frame) 0x1 0x0) 7)
(if (:fin frame)
(case (:op frame)
:text 1)
0x0)))]
(if (> size 125)
(if (> size 0xffff)
(let [buf (byte-array (+ size 2 8))]
(aset buf 0 first-byte)
(aset buf 1 (unchecked-byte 127))
(dotimes [i 8]
(aset buf (inc (- 8 i)) (unchecked-byte (bit-and (bit-shift-right size (* i 8)) 0xff))))
(dotimes [i size]
(aset buf (+ i 2 8) (aget data i)))
[buf])
(let [buf (byte-array (+ size 2 2))]
(aset buf 0 first-byte)
(aset buf 1 (unchecked-byte 126))
(aset buf 3 (unchecked-byte (bit-and size 0xff)))
(aset buf 2 (unchecked-byte (bit-and (bit-shift-right size 8) 0xff)))
(dotimes [i size]
(aset buf (+ i 2 2) (aget data i)))
[buf]))
(let [buf (byte-array (+ size 2))]
(aset buf 0 first-byte)
(aset buf 1 (byte size))
(dotimes [i size]
(aset buf (+ i 2) (aget data i)))
[buf]))))
(defn blob [host]
(-> "
<html>
<head>
</head>
<body>
<dl id=\"chats\">
</dl>
<textarea id=\"message\"> </textarea>
<button id=\"send\">Send</button>
<script>
var ws = new WebSocket(\"ws://HOST/s\")
ws.onopen = function (evt) {
document.getElementById(\"send\").onclick=function(evt){
var body = document.getElementById(\"message\").value;
document.getElementById(\"message\").value = \"\";
ws.send(body);
}
}
ws.onmessage=function(evt){
var d = evt.data.split(\"\\u241e\");
var nick = document.createElement(\"dt\");
nick.innerHTML=d[0];
var msg = document.createElement(\"dd\");
msg.innerHTML=d[1];
document.getElementById(\"chats\").appendChild(nick);
document.getElementById(\"chats\").appendChild(msg);
}
</script>
</body>
</html>
"
(.replaceAll "HOST" host)))
(defn ^SelectableChannel channel [^SelectionKey k]
(.channel k))
(defn channel-handler
[selector ^SelectableChannel ch ^LinkedBlockingQueue exec]
(let [accept (LinkedBlockingQueue.)
read (LinkedBlockingQueue.)
write (LinkedBlockingQueue.)]
(fn x
([^SelectionKey k]
(when (.isReadable k)
(when-let [f (.poll read)]
(exec #(f k))))
(when (.isAcceptable k)
(when-let [f (.poll accept)]
(exec #(f k))))
(when (.isWritable k)
(when-let [f (.poll write)]
(exec #(f k))))
(.register ch
selector
(bit-or (if (seq read) SelectionKey/OP_READ 0x0)
(if (seq write) SelectionKey/OP_WRITE 0x0)
(if (seq accept) SelectionKey/OP_ACCEPT 0x0))
x))
([op handler]
(case op
:read (.put read handler)
:write (.put write handler)
:accept (.put accept handler))
(.register ch
selector
(bit-or (if (seq read) SelectionKey/OP_READ 0x0)
(if (seq write) SelectionKey/OP_WRITE 0x0)
(if (seq accept) SelectionKey/OP_ACCEPT 0x0))
x)))))
(defn ws-reply [^String protos key]
(concat
[(ByteBuffer/wrap (.getBytes "HTTP/1.1 101 Switching Protocols\r\n"))
(ByteBuffer/wrap (.getBytes "Upgrade: websocket\r\n"))
(ByteBuffer/wrap (.getBytes "Connection: Upgrade\r\n"))]
(when (seq protos)
[(ByteBuffer/wrap (.getBytes "Sec-WebSocket-Protocol: "))
(ByteBuffer/wrap (.getBytes ^String (last (.split protos ","))))
(ByteBuffer/wrap (.getBytes "\r\n"))])
[(ByteBuffer/wrap
(.getBytes (format "Sec-WebSocket-Accept: %s"
(.encodeToString (java.util.Base64/getEncoder)
(.digest (java.security.MessageDigest/getInstance "SHA1")
(.getBytes (str key "258EAFA5-E914-47DA-95CA-C5AB0DC85B11") "utf-8"))))))
(ByteBuffer/wrap (.getBytes "\r\n"))
(ByteBuffer/wrap (.getBytes "\r\n"))]))
(defn fill-buffer [kont ^ByteBuffer buf ^SelectionKey k]
(assert (.isOpen (channel k)))
(.read ^SocketChannel (channel k) buf)
(if (.hasRemaining buf)
((.attachment k) :read (partial fill-buffer kont buf))
(kont buf)))
(defn decode-ws-frame [^ByteBuffer buf ^SelectionKey k kont]
(try
(assert (.isOpen (channel k)))
(if (.hasRemaining buf)
((.attachment k)
:read
(fn [k]
(.read ^SocketChannel (channel k) buf)
(decode-ws-frame buf k kont)))
(do
(.flip buf)
(let [first-byte (.get buf)
second-byte (.get buf)
first-size (bit-and (bit-and
(bit-not (bit-shift-left 0x1 7))
second-byte)
0xff)
masked? (if (zero? (bit-shift-right second-byte 7))
false
true )]
(if (> 126 first-size)
(fill-buffer
(fn [^ByteBuffer mask]
(fill-buffer
(fn [^ByteBuffer masked]
(.flip mask)
(.flip masked)
(let [unmasked (byte-array first-size)
op (case (bit-and first-byte 2r1111)
1 :text
8 :close
9 :ping)
fin (if (= 1 (bit-and 0x1 (bit-shift-right first-byte 7)))
true
false)]
(dotimes [i first-size]
(aset unmasked
i
(unchecked-byte (bit-xor
(.get masked (int i))
(.get mask (int (mod i 4)))))))
(kont
(cond (= op :text)
{:text (String. ^bytes unmasked)
:masked masked?
:fin fin
:op op}
(and (= op :close) (= 2 (count unmasked)))
{:data unmasked
:masked masked?
:fin fin
:op op
:code (bit-or (bit-shift-left (bit-and (long (aget unmasked 0)) 0xff) 8)
(bit-and (long (aget unmasked 1)) 0xff))}
:else
{:data unmasked
:masked masked?
:c (count unmasked)
:fin fin
:op op}))))
(ByteBuffer/allocate first-size)
k))
(ByteBuffer/allocate 4)
k)
(assert nil "smol")))))
(catch Throwable t
(.close (channel k))
(log t))))
(defn event-loop []
(let [run-queue (LinkedBlockingQueue.)
closed? (atom false)
selector (Selector/open)]
(future
(while (not @closed?)
(try
((.take run-queue))
(catch Throwable t
(log t)))))
(.put run-queue (fn []
((fn selecting [^long n]
(if (zero? n)
(.selectNow selector)
(.select selector n))
(let [s (.selectedKeys selector)]
(if (seq s)
(.put run-queue #(selecting 0))
(.put run-queue #(selecting (min 1000 (+ 10 n)))))
(doseq [^SelectionKey k s]
(.remove s k)
(.put run-queue #((.attachment k) k)))))
0)))
{:closed? closed?
:exec (fn [op] (.put run-queue op))
:selector selector}))
(defn websocket2 [port blob ws-frame-handler]
(let [server-socket (doto (ServerSocketChannel/open)
(.configureBlocking false)
(.bind (InetSocketAddress. "0.0.0.0" (long port))))
{:keys [closed? exec selector]} (event-loop)]
(letfn [(accept [^SelectionKey k]
(let [s (.accept server-socket)
_ (assert s)
_ (.configureBlocking s false)
ch (channel-handler selector s exec)]
(ch :read http-start)
((.attachment k) :accept accept)))
(write-to-channel [kont buffers ^SelectionKey k]
(if (seq buffers)
(let [[^ByteBuffer a & b] buffers]
(.write ^SocketChannel (channel k) a)
(if (.hasRemaining a)
((.attachment k) :write (partial write-to-channel kont buffers))
((.attachment k) :write (partial write-to-channel kont b))))
(kont k)))
(http-serve [headers ^SelectionKey k]
(let [[_ host] (re-find #"Host: (.*)" (str headers))
b (.getBytes ^String (blob host))
^SocketChannel ch (channel k)]
((.attachment k)
:write
(partial write-to-channel
#(.close (channel %))
[(ByteBuffer/wrap (.getBytes "HTTP/1.1 200 OK\r\n"))
(ByteBuffer/wrap (.getBytes "Server: Apache/2.2.14 (Win32)\r\n"))
(ByteBuffer/wrap (.getBytes "X-Clacks-Overhead: GNU Terry Pratchett\r\n"))
(ByteBuffer/wrap (.getBytes (format "Content-Length: %s\r\n" (count b))))
(ByteBuffer/wrap (.getBytes "Content-Type: text/html\r\n"))
(ByteBuffer/wrap (.getBytes "Connection: Closed\r\n"))
(ByteBuffer/wrap (.getBytes "\r\n"))
(ByteBuffer/wrap b)]))))
(ws-serve [headers ^SelectionKey k buf]
(let [[_ key] (re-find #"Sec-WebSocket-Key: (.*)" (str headers))
[_ ^String protos] (re-find #"Sec-WebSocket-Protocol: (.*)" (str headers))]
(assert key)
((.attachment k)
:write
(partial write-to-channel
((fn f [state]
(fn [^SelectionKey k]
(assert (fn? (.attachment k)))
(assert (.isOpen (channel k)))
(decode-ws-frame (ByteBuffer/allocate 2)
k
(fn [frame]
(case (:op frame)
:close (do
(log frame)
(ws-frame-handler state nil))
:ping (do
(log "ping")
((.attachment k)
:write
(partial write-to-channel
(f state)
(for [buf (encode-frame
{:fin true
:op :pong
:data (:data frame)})]
(ByteBuffer/wrap buf)))))
:text ((f (ws-frame-handler state frame)) k))))))
(ws-frame-handler (.attachment k)))
(ws-reply protos key)))))
;; TODO: this read is not safe/correct
(http-start [^SelectionKey k]
(try
(let [buf (ByteBuffer/allocate (* 4 1024 1024))
^SocketChannel ch (channel k)]
(.read ch buf)
(.flip buf)
(assert (> (.limit buf) (.position buf)))
(let [headers (StringBuffer.)]
(loop [s :init
c (.get buf)]
(.appendCodePoint headers c)
(let [ns (get-in m [s c])]
(case (get m ns)
nil (if (contains? (get m s) :whatever)
(recur (get-in m [s :whatever]) (.get buf))
(.close (channel k)))
:ws (ws-serve headers k buf)
:index (http-serve headers k)
(recur ns (.get buf)))))))
(catch Throwable t
(log t)
(.close (channel k)))))]
((channel-handler selector server-socket exec) :accept accept))
(reify
Closeable
(close [_]
(reset! closed? true)
(.close server-socket)))))
(defn check-output [m]
(assert (fn? (:handler m)))
(let [this-agent *agent*]
(if (seq (:out m))
(let [^ByteBuffer buf (peek (:out m))]
(if (.hasRemaining buf)
(do
((:handler m)
:write
(fn [k]
(assert (.isOpen (channel k)))
(.write ^SocketChannel (channel k) buf)
(send this-agent check-output)))
m)
(do
(send this-agent check-output)
(update-in m [:out] pop))))
m)))
(defonce chats (atom #{}))
(websocket2 8081
blob
(fn
([handler]
(assert (fn? handler))
(let [a (agent
{:connected? false
:subscriptions {}
:ids {}
:publishers {}
:out PersistentQueue/EMPTY
:nick (str (UUID/randomUUID))
:handler handler})]
(swap! chats conj a)
a))
([state frame]
(assert (fn? (:handler @state)))
(if (nil? frame)
(do
(swap! chats disj state)
(send state update-in [:out] empty)
state)
(do
(log frame)
(doseq [c @chats
buf (encode-frame
{:fin true
:op :text
:text (format "%s\u241e%s"
(:nick @state)
(.trim ^String (:text frame)))})]
(send c update-in [:out] conj (ByteBuffer/wrap buf))
(send c check-output))))
state)))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.