Skip to content

Instantly share code, notes, and snippets.

Created July 30, 2014 16:48
Show Gist options
  • Save kballenegger/c7a9455649866ca84511 to your computer and use it in GitHub Desktop.
Save kballenegger/c7a9455649866ca84511 to your computer and use it in GitHub Desktop.
ZMQ message broker
(:require clojure.pprint
(:use [org.zeromq.clojure :as zmq]
[ :as cli]
[clj-yaml.core :as yaml])
(:import com.surftools.BeanstalkClient.Client
(defn trap
"traps a [signal] and executes a [function]"
[sig f]
(sun.misc.Signal. sig)
(proxy [sun.misc.SignalHandler] []
(handle [_]
(defn exception-handler [e]
(println "Caught Exception:")
(clojure.stacktrace/print-stack-trace e)
(println "Crashing...")
(System/exit 1)))
(let [ctx (zmq/make-context 1)]
(let [make-output-sender
(fn [block endpoint]
(let [s (zmq/make-socket ctx zmq/+downstream+)
send (cond block (fn [payload] (zmq/send- s payload))
:else (fn [payload] (zmq/send- s payload zmq/+noblock+)))]
(if block (.setHWM s 1))
(zmq/bind s endpoint)
(defn make-persistent-queue
"returns a function which queues a payload in a persistent queue, while simultaneously starting a background thread which processes said queue"
[conf beanstalk-conf]
(.start (Thread. (fn []
(let [b (ClientImpl. (:host beanstalk-conf) (:port beanstalk-conf))
send (make-output-sender true (:endpoint conf))]
(.watch b (:queue conf))
(loop []
(let [job (.reserve b (int (* 60 5)))]
(cond (nil? job) (Thread/sleep 300)
:else (do
(send (.getData job))
(.delete b (.getJobId job))))
(catch Throwable e (exception-handler e))))))
(let [b (ClientImpl. (:host beanstalk-conf) (:port beanstalk-conf))]
(.useTube b (:queue conf))
(fn [p] (.put b 0 0 (* 60 5) p))))
(defn make-queue-or-doer
"based on an [output configuration] either create a named persistent queue or return a function that executes its argument"
[o beanstalk-conf]
(cond (nil? (:queue o))
(let [send (make-output-sender false (:endpoint o))]
(fn [p] (send p)))
:else (make-persistent-queue o beanstalk-conf))))
(defn make-receiver
"returns a function that listens on a specified [address] and executes a [function]"
(let [in-socket (zmq/make-socket ctx zmq/+upstream+)]
(zmq/bind in-socket addr)
(fn [f] (f (zmq/recv in-socket)))))
(defn make-sender
"returns a function that sends a message over the sockets as specified in the [configuration]"
(let [pub-socket (zmq/make-socket ctx zmq/+pub+)
out-queues (doall (map #(make-queue-or-doer % (:beanstalk conf)) (:outputs conf)))] ; and persistent queues
(.setHWM pub-socket 100) (zmq/bind pub-socket (:pub conf)) ; NOTE: keeping 100 messages in memory
(fn [payload]
(zmq/send- pub-socket payload zmq/+noblock+)
(let [head-match #(.equals (String. %2 0 (min (alength %2) (count %1))) %1 )]
(doseq [[conf queue] (map vector (:outputs conf) out-queues)]
(cond (head-match (:filter conf) payload)
(queue payload))))))))
(defn broker
"broker zmq sockets with [configuration]"
(let [with-receiver (make-receiver (:input conf))
send (make-sender conf)]
(loop [] ; main loop
(with-receiver #(send %))
(defn -main [& args]
(let [[opts _ help] (cli/cli args
["-c" "--conf" "Configuration file" :default "../config/firehose-broker.yml"]
["--help" "Show this help" :flag true])]
(cond (true? (:help opts)) (println help)
:else (let [conf (yaml/parse-string (slurp (:conf opts)))]
(println "Options are:")
(clojure.pprint/pprint conf)
(trap "INT" (fn [] (println "Good-bye...") (System/exit 0)))
(println "Server now running...")
(try (broker conf) (catch Throwable e (exception-handler e)))))))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment