Skip to content

Instantly share code, notes, and snippets.

@xudifsd
Last active December 21, 2015 13:48
Show Gist options
  • Save xudifsd/6314798 to your computer and use it in GitHub Desktop.
Save xudifsd/6314798 to your computer and use it in GitHub Desktop.
use zmq as concurrent server
(ns load-blancer
(:import [org.zeromq ZMQ ZMQ$PollItem ZMQ$Poller]))
; the functionality of this is same with zmq-server.clj, but we handle routing manually
(defn dequeue! [queue-ref]
(dosync
(let [q @queue-ref
result (peek q)
remain (if (= q [])
[]
(pop q))]
(ref-set queue-ref remain)
result)))
(defn enqueue! [queue-ref element]
(dosync
(let [q @queue-ref]
(ref-set queue-ref (conj q element)))))
(defn assoc-dic! [map-ref k v]
(dosync
(ref-set map-ref (assoc @map-ref k v))))
(defn dissoc-dic! [map-ref k]
(dosync
(let [result (get @map-ref k)]
(ref-set map-ref (dissoc @map-ref k))
result)))
(def hi_boss "Hi, boss")
(let [worker-queue (ref [])
connect-dic (ref {})
ctx (ZMQ/context 1)
base64-encoder (sun.misc.BASE64Encoder.)
proxy-in (.socket ctx ZMQ/ROUTER)
proxy-out (.socket ctx ZMQ/ROUTER)
proxy-in-pollitem (ZMQ$PollItem. proxy-in ZMQ$Poller/POLLIN)
proxy-out-pollitem (ZMQ$PollItem. proxy-out ZMQ$Poller/POLLIN)
poller (doto (ZMQ$Poller. 2)
(.register proxy-in-pollitem)
(.register proxy-out-pollitem))]
(.bind proxy-in "tcp://*:2013")
(.bind proxy-out "tcp://*:2014")
(let [handler (fn []
(let [sock (.socket (ZMQ/context 1) ZMQ/REQ)]
(.connect sock "tcp://127.0.0.1:2014")
(.send sock (.getBytes hi_boss) ZMQ/NOBLOCK)
(loop [i 1]
(let [recved (String. (.recv sock 0))]
(prn (str "worker in thread-loop" i " got " recved))
(-> recved
Integer/valueOf
Thread/sleep)
(.send sock (.getBytes recved) 0)
(recur (inc i))))))
thread-pool (repeatedly 10 (fn [] (Thread. handler)))]
(doall (map (memfn start) thread-pool))
(loop [i 0]
(prn (str "main-loop " i))
(if (> (.poll poller 1000) 0);poll for 1 sec
(cond (and (.pollin poller 0)
(> (count @worker-queue) 0));if has request and idle worker
(let [request-id (.recv proxy-in 0)
empty-delimitate (.recv proxy-in 0)
content (.recv proxy-in 0)
worker-id (dequeue! worker-queue)]
(.send proxy-out worker-id ZMQ/SNDMORE)
(.send proxy-out (.getBytes "") ZMQ/SNDMORE)
(.send proxy-out content 0)
(assoc-dic! connect-dic (.encodeBuffer base64-encoder worker-id) request-id))
(.pollin poller 1);has result, or READY
(let [worker-id (.recv proxy-out 0)
empty-delimitate (.recv proxy-out 0)
content (.recv proxy-out 0)]
(enqueue! worker-queue worker-id)
(if (= (String. content) hi_boss)
(prn (str "got worker " (String. worker-id)))
(let [peer (dissoc-dic! connect-dic (.encodeBuffer base64-encoder worker-id))]
(if (= peer nil) (do (prn "fatal bug") (System/exit 1)))
(.send proxy-in peer ZMQ/SNDMORE)
(.send proxy-in (.getBytes "") ZMQ/SNDMORE)
(.send proxy-in content 0))))
:else ;there are request but no worker
(Thread/sleep 200))
(Thread/sleep 200));sleep for 0.2sec
(recur (inc i)))))
#!/bin/bash
# copied from http://http-kit.org/blog.html
# edit /etc/security/limits.conf, add the following line, need logout and login again
* - nofile 4999999
echo 9999999 | sudo tee /proc/sys/fs/nr_open
echo 9999999 | sudo tee /proc/sys/fs/file-max
# set before run the server and test code
ulimit -n 4999999
# More ports for test code to use
sudo sysctl -w net.ipv4.ip_local_port_range="1025 65535"
clojure zmq-server.clj & # start the server
clojure zmq-client.clj # print "Elapsed time: 12538.432267 msecs" in my laptop
# I don't know if this number is impressive or not, but it's impressive that
# you can write a concurrent server with such few code using ZMQ
(import 'org.zeromq.ZMQ)
(def ctx (ZMQ/context 10));10 background IO worker
(defn client []
(let [sock (.socket ctx ZMQ/REQ)]
(.connect sock "tcp://127.0.0.1:2013")
(.send sock (.getBytes "Hello world") 0)
; (prn "sent")
(.recv sock 0)
(.close sock)))
(def pool (repeatedly 50000 (fn [] (Thread. client))))
(time (do
(doall (map (memfn start) pool))
(doall (map (memfn join) pool))))
(import 'org.zeromq.ZMQ)
(let [ctx (ZMQ/context 1);1 background IO worker
proxy-in (.socket ctx ZMQ/ROUTER)
proxy-out (.socket ctx ZMQ/DEALER)]
(.bind proxy-in "tcp://*:2013")
(.bind proxy-out "inproc://workers")
(let [handler (fn [] ;worker function
(let [sock (.socket ctx ZMQ/REP)]
(.connect sock "inproc://workers")
(loop [i 1]
(let [recved-string (String. (.recv sock 0))]
; (prn recved-string)
; (prn (str "times " i))
(.send sock (.getBytes (str "times " i)) ZMQ/NOBLOCK)
(recur (inc i))))))
thread-pool (repeatedly 5 (fn []
(Thread. handler)))]
(doall (map (memfn start) thread-pool)); start worker
(prn "started worker, waiting in main thread")
(ZMQ/proxy proxy-in proxy-out nil)
(doall (map (memfn join) thread-pool))))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment