Skip to content

Instantly share code, notes, and snippets.

@ykarikos
Last active August 29, 2015 14:13
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ykarikos/afd49a0b98bcbd166878 to your computer and use it in GitHub Desktop.
Save ykarikos/afd49a0b98bcbd166878 to your computer and use it in GitHub Desktop.
Full duplex Websocket connections with manifold streams and aleph http lib
user=> (use 'ws-test)
nil
user=> @state
{}
user=> (connect "localhost")
true
user=> @@connection
<< stream: {:type "splice", :sink {:type "netty", :sink? true, :closed? false}, :source {:pending-puts 0, :drained? false, :buffer-size 0, :permanent? false, :type "manifold", :sink? true, :closed? false, :pending-takes 1, :buffer-capacity 16, :source? true}} >>
user=> (send-message @@connection {:msg "It works!" :count 1})
<< true >>
user=> @state
{}
user=> received {:count 2, :msg It works!}
user=> @state
{:msg "It works!", :count 2}
user=> Bye for now!
(defproject ws-test "0.1.0-SNAPSHOT"
:dependencies [[org.clojure/clojure "1.6.0"]
[org.clojure/data.json "0.2.5"]
[manifold "0.1.0-beta7"]
[aleph "0.4.0-alpha9"]])
user=> (use 'ws-test)
nil
user=> @state
{}
user=> (start-server)
Server started.
nil
user=> received {:msg It works!, :count 1}
user=> @state
{:count 1, :msg "It works!"}
user=> (send-message server-out (swap! state merge {:count (+ 1 (:count @state))}))
<< true >>
user=> @state
{:count 2, :msg "It works!"}
user=> (.close @server)
nil
user=> Bye for now!
; src/ws_test.clj
; https://github.com/ztellman/aleph/
; https://github.com/ztellman/manifold/
(ns ws-test
(:require [clojure.data.json :as json]
[manifold.stream :as s]
[aleph.http :as http]))
(def port 12345)
; server
(def state (atom {}))
(def server (atom nil))
(def server-out (s/stream))
(defn- parse-json [s]
(try
(json/read-str s :key-fn keyword)
(catch Exception e {})))
(defn- consumer [data]
(let [json (parse-json data)]
(println "received" json)
(swap! state merge json)))
(defn- handler [req]
(let [net @(http/websocket-connection req)]
(s/connect server-out net)
(s/consume consumer net)))
(defn- started? []
(not (= nil @server)))
(defn start-server []
(if (started?)
(println "Server already started")
(do
(reset! server (http/start-server handler {:port port}))
(println "Server started."))))
(defn send-message [conn data]
(s/put! conn (json/write-str data)))
; client
(def connection (atom nil))
(defn connect [hostname]
(do
(reset! connection (http/websocket-client (str "ws://" hostname ":" port)))
(s/consume consumer @@connection)))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment