Last active
December 21, 2015 13:48
-
-
Save xudifsd/6314798 to your computer and use it in GitHub Desktop.
use zmq as concurrent server
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(ns load-blancer | |
(:import [org.zeromq ZMQ ZMQ$PollItem ZMQ$Poller])) | |
; the functionality of this is same with zmq-server.clj, but we handle routing manually | |
(defn dequeue! [queue-ref] | |
(dosync | |
(let [q @queue-ref | |
result (peek q) | |
remain (if (= q []) | |
[] | |
(pop q))] | |
(ref-set queue-ref remain) | |
result))) | |
(defn enqueue! [queue-ref element] | |
(dosync | |
(let [q @queue-ref] | |
(ref-set queue-ref (conj q element))))) | |
(defn assoc-dic! [map-ref k v] | |
(dosync | |
(ref-set map-ref (assoc @map-ref k v)))) | |
(defn dissoc-dic! [map-ref k] | |
(dosync | |
(let [result (get @map-ref k)] | |
(ref-set map-ref (dissoc @map-ref k)) | |
result))) | |
(def hi_boss "Hi, boss") | |
(let [worker-queue (ref []) | |
connect-dic (ref {}) | |
ctx (ZMQ/context 1) | |
base64-encoder (sun.misc.BASE64Encoder.) | |
proxy-in (.socket ctx ZMQ/ROUTER) | |
proxy-out (.socket ctx ZMQ/ROUTER) | |
proxy-in-pollitem (ZMQ$PollItem. proxy-in ZMQ$Poller/POLLIN) | |
proxy-out-pollitem (ZMQ$PollItem. proxy-out ZMQ$Poller/POLLIN) | |
poller (doto (ZMQ$Poller. 2) | |
(.register proxy-in-pollitem) | |
(.register proxy-out-pollitem))] | |
(.bind proxy-in "tcp://*:2013") | |
(.bind proxy-out "tcp://*:2014") | |
(let [handler (fn [] | |
(let [sock (.socket (ZMQ/context 1) ZMQ/REQ)] | |
(.connect sock "tcp://127.0.0.1:2014") | |
(.send sock (.getBytes hi_boss) ZMQ/NOBLOCK) | |
(loop [i 1] | |
(let [recved (String. (.recv sock 0))] | |
(prn (str "worker in thread-loop" i " got " recved)) | |
(-> recved | |
Integer/valueOf | |
Thread/sleep) | |
(.send sock (.getBytes recved) 0) | |
(recur (inc i)))))) | |
thread-pool (repeatedly 10 (fn [] (Thread. handler)))] | |
(doall (map (memfn start) thread-pool)) | |
(loop [i 0] | |
(prn (str "main-loop " i)) | |
(if (> (.poll poller 1000) 0);poll for 1 sec | |
(cond (and (.pollin poller 0) | |
(> (count @worker-queue) 0));if has request and idle worker | |
(let [request-id (.recv proxy-in 0) | |
empty-delimitate (.recv proxy-in 0) | |
content (.recv proxy-in 0) | |
worker-id (dequeue! worker-queue)] | |
(.send proxy-out worker-id ZMQ/SNDMORE) | |
(.send proxy-out (.getBytes "") ZMQ/SNDMORE) | |
(.send proxy-out content 0) | |
(assoc-dic! connect-dic (.encodeBuffer base64-encoder worker-id) request-id)) | |
(.pollin poller 1);has result, or READY | |
(let [worker-id (.recv proxy-out 0) | |
empty-delimitate (.recv proxy-out 0) | |
content (.recv proxy-out 0)] | |
(enqueue! worker-queue worker-id) | |
(if (= (String. content) hi_boss) | |
(prn (str "got worker " (String. worker-id))) | |
(let [peer (dissoc-dic! connect-dic (.encodeBuffer base64-encoder worker-id))] | |
(if (= peer nil) (do (prn "fatal bug") (System/exit 1))) | |
(.send proxy-in peer ZMQ/SNDMORE) | |
(.send proxy-in (.getBytes "") ZMQ/SNDMORE) | |
(.send proxy-in content 0)))) | |
:else ;there are request but no worker | |
(Thread/sleep 200)) | |
(Thread/sleep 200));sleep for 0.2sec | |
(recur (inc i))))) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/bin/bash | |
# copied from http://http-kit.org/blog.html | |
# edit /etc/security/limits.conf, add the following line, need logout and login again | |
* - nofile 4999999 | |
echo 9999999 | sudo tee /proc/sys/fs/nr_open | |
echo 9999999 | sudo tee /proc/sys/fs/file-max | |
# set before run the server and test code | |
ulimit -n 4999999 | |
# More ports for test code to use | |
sudo sysctl -w net.ipv4.ip_local_port_range="1025 65535" | |
clojure zmq-server.clj & # start the server | |
clojure zmq-client.clj # print "Elapsed time: 12538.432267 msecs" in my laptop | |
# I don't know if this number is impressive or not, but it's impressive that | |
# you can write a concurrent server with such few code using ZMQ |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(import 'org.zeromq.ZMQ) | |
(def ctx (ZMQ/context 10));10 background IO worker | |
(defn client [] | |
(let [sock (.socket ctx ZMQ/REQ)] | |
(.connect sock "tcp://127.0.0.1:2013") | |
(.send sock (.getBytes "Hello world") 0) | |
; (prn "sent") | |
(.recv sock 0) | |
(.close sock))) | |
(def pool (repeatedly 50000 (fn [] (Thread. client)))) | |
(time (do | |
(doall (map (memfn start) pool)) | |
(doall (map (memfn join) pool)))) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(import 'org.zeromq.ZMQ) | |
(let [ctx (ZMQ/context 1);1 background IO worker | |
proxy-in (.socket ctx ZMQ/ROUTER) | |
proxy-out (.socket ctx ZMQ/DEALER)] | |
(.bind proxy-in "tcp://*:2013") | |
(.bind proxy-out "inproc://workers") | |
(let [handler (fn [] ;worker function | |
(let [sock (.socket ctx ZMQ/REP)] | |
(.connect sock "inproc://workers") | |
(loop [i 1] | |
(let [recved-string (String. (.recv sock 0))] | |
; (prn recved-string) | |
; (prn (str "times " i)) | |
(.send sock (.getBytes (str "times " i)) ZMQ/NOBLOCK) | |
(recur (inc i)))))) | |
thread-pool (repeatedly 5 (fn [] | |
(Thread. handler)))] | |
(doall (map (memfn start) thread-pool)); start worker | |
(prn "started worker, waiting in main thread") | |
(ZMQ/proxy proxy-in proxy-out nil) | |
(doall (map (memfn join) thread-pool)))) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment