Created
July 30, 2014 16:48
-
-
Save kballenegger/c7a9455649866ca84511 to your computer and use it in GitHub Desktop.
ZMQ message broker
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(ns cb.broker | |
(:require clojure.pprint | |
clojure.stacktrace) | |
(:use [org.zeromq.clojure :as zmq] | |
[clojure.tools.cli :as cli] | |
[clj-yaml.core :as yaml]) | |
(:import com.surftools.BeanstalkClient.Client | |
com.surftools.BeanstalkClientImpl.ClientImpl) | |
(:gen-class)) | |
(defn trap | |
"traps a [signal] and executes a [function]" | |
[sig f] | |
(sun.misc.Signal/handle | |
(sun.misc.Signal. sig) | |
(proxy [sun.misc.SignalHandler] [] | |
(handle [_] | |
(f))))) | |
(defn exception-handler [e] | |
(do | |
(println "Caught Exception:") | |
(clojure.stacktrace/print-stack-trace e) | |
(println "Crashing...") | |
(System/exit 1))) | |
(let [ctx (zmq/make-context 1)] | |
(let [make-output-sender | |
(fn [block endpoint] | |
(let [s (zmq/make-socket ctx zmq/+downstream+) | |
send (cond block (fn [payload] (zmq/send- s payload)) | |
:else (fn [payload] (zmq/send- s payload zmq/+noblock+)))] | |
(if block (.setHWM s 1)) | |
(zmq/bind s endpoint) | |
send))] | |
(defn make-persistent-queue | |
"returns a function which queues a payload in a persistent queue, while simultaneously starting a background thread which processes said queue" | |
[conf beanstalk-conf] | |
(.start (Thread. (fn [] | |
(try | |
(let [b (ClientImpl. (:host beanstalk-conf) (:port beanstalk-conf)) | |
send (make-output-sender true (:endpoint conf))] | |
(.watch b (:queue conf)) | |
(loop [] | |
(let [job (.reserve b (int (* 60 5)))] | |
(cond (nil? job) (Thread/sleep 300) | |
:else (do | |
(send (.getData job)) | |
(.delete b (.getJobId job)))) | |
(recur)))) | |
(catch Throwable e (exception-handler e)))))) | |
(let [b (ClientImpl. (:host beanstalk-conf) (:port beanstalk-conf))] | |
(.useTube b (:queue conf)) | |
(fn [p] (.put b 0 0 (* 60 5) p)))) | |
(defn make-queue-or-doer | |
"based on an [output configuration] either create a named persistent queue or return a function that executes its argument" | |
[o beanstalk-conf] | |
(cond (nil? (:queue o)) | |
(let [send (make-output-sender false (:endpoint o))] | |
(fn [p] (send p))) | |
:else (make-persistent-queue o beanstalk-conf)))) | |
(defn make-receiver | |
"returns a function that listens on a specified [address] and executes a [function]" | |
[addr] | |
(let [in-socket (zmq/make-socket ctx zmq/+upstream+)] | |
(zmq/bind in-socket addr) | |
(fn [f] (f (zmq/recv in-socket))))) | |
(defn make-sender | |
"returns a function that sends a message over the sockets as specified in the [configuration]" | |
[conf] | |
(let [pub-socket (zmq/make-socket ctx zmq/+pub+) | |
out-queues (doall (map #(make-queue-or-doer % (:beanstalk conf)) (:outputs conf)))] ; and persistent queues | |
(.setHWM pub-socket 100) (zmq/bind pub-socket (:pub conf)) ; NOTE: keeping 100 messages in memory | |
(fn [payload] | |
(zmq/send- pub-socket payload zmq/+noblock+) | |
(let [head-match #(.equals (String. %2 0 (min (alength %2) (count %1))) %1 )] | |
(doseq [[conf queue] (map vector (:outputs conf) out-queues)] | |
(cond (head-match (:filter conf) payload) | |
(queue payload)))))))) | |
(defn broker | |
"broker zmq sockets with [configuration]" | |
[conf] | |
(let [with-receiver (make-receiver (:input conf)) | |
send (make-sender conf)] | |
(loop [] ; main loop | |
(with-receiver #(send %)) | |
(recur)))) | |
(defn -main [& args] | |
(let [[opts _ help] (cli/cli args | |
["-c" "--conf" "Configuration file" :default "../config/firehose-broker.yml"] | |
["--help" "Show this help" :flag true])] | |
(cond (true? (:help opts)) (println help) | |
:else (let [conf (yaml/parse-string (slurp (:conf opts)))] | |
(println "Options are:") | |
(clojure.pprint/pprint conf) | |
(trap "INT" (fn [] (println "Good-bye...") (System/exit 0))) | |
(println "Server now running...") | |
(try (broker conf) (catch Throwable e (exception-handler e))))))) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment