-
-
Save kballenegger/36b552afb7e9975d78e9 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(ns cb.broker | |
(:require clojure.pprint) | |
(:use [org.zeromq.clojure :as zmq] | |
[clojure.tools.cli :as cli] | |
[clj-yaml.core :as yaml]) | |
(:import com.surftools.BeanstalkClient.Client | |
com.surftools.BeanstalkClientImpl.ClientImpl) | |
(:gen-class)) | |
(defn trap | |
"traps a [signal] and executes a [function]" | |
[sig f] | |
(sun.misc.Signal/handle | |
(sun.misc.Signal. sig) | |
(proxy [sun.misc.SignalHandler] [] | |
(handle [_] | |
(f))))) | |
(let [ctx (zmq/make-context 1)] | |
(let [make-output-sender | |
(fn [block endpoint] | |
(let [s (zmq/make-socket ctx zmq/+downstream+) | |
send (cond block (fn [payload] (zmq/send- s payload)) | |
:else (fn [payload] (zmq/send- s payload zmq/+noblock+)))] | |
(zmq/bind s endpoint) | |
send))] | |
(defn make-persistent-queue | |
"returns a function which queues a payload in a persistent queue, while simultaneously starting a background thread which processes said queue" | |
[conf beanstalk-conf] | |
(.start (Thread. (fn [] | |
(let [b (ClientImpl. (:host beanstalk-conf) (:port beanstalk-conf)) | |
send (make-output-sender false (:endpoint conf))] | |
(.watch b (:queue conf)) | |
(loop [] | |
(let [job (.reserve b (int (* 60 5)))] | |
(cond (nil? job) (Thread/sleep 300) | |
:else (do | |
(send (.getData job)) | |
(.delete b (.getJobId job)))) | |
(recur))))))) | |
(let [b (ClientImpl. (:host beanstalk-conf) (:port beanstalk-conf))] | |
(.useTube b (:queue conf)) | |
(fn [p] (.put b 0 0 (* 60 5) p)))) | |
(defn make-queue-or-doer | |
"based on an [output configuration] either create a named persistent queue or return a function that executes its argument" | |
[o beanstalk-conf] | |
(cond (nil? (:queue o)) | |
(let [send (make-output-sender false (:endpoint o))] | |
(fn [p] (send p))) | |
:else (make-persistent-queue o beanstalk-conf)))) | |
(defn make-receiver | |
"returns a function that listens on a specified [address] and executes a [function]" | |
[addr] | |
(let [in-socket (zmq/make-socket ctx zmq/+upstream+)] | |
(zmq/bind in-socket addr) | |
(fn [f] (f (zmq/recv in-socket))))) | |
(defn make-sender | |
"returns a function that sends a message over the sockets as specified in the [configuration]" | |
[conf] | |
(let [pub-socket (zmq/make-socket ctx zmq/+pub+) | |
out-queues (doall (map #(make-queue-or-doer % (:beanstalk conf)) (:outputs conf)))] ; and persistent queues | |
(.setHWM pub-socket 100) (zmq/bind pub-socket (:pub conf)) ; NOTE: keeping 100 messages in memory | |
(fn [payload] | |
(zmq/send- pub-socket payload zmq/+noblock+) | |
(let [head-match #(.equals (String. %2 0 (count %1)) %1 )] | |
(doseq [[conf queue] (map vector (:outputs conf) out-queues)] | |
(cond (head-match (:filter conf) payload) | |
(queue payload)))))))) | |
(defn broker | |
"broker zmq sockets with [configuration]" | |
[conf] | |
(let [with-receiver (make-receiver (:input conf)) | |
send (make-sender conf)] | |
(loop [] ; main loop | |
(with-receiver #(send %)) | |
(recur)))) | |
(defn -main [& args] | |
(let [[opts _ help] (cli/cli args | |
["-c" "--conf" "Configuration file" :default "../config/firehose-broker.yml"] | |
["--help" "Show this help" :flag true])] | |
(cond (true? (:help opts)) (println help) | |
:else (let [conf (yaml/parse-string (slurp (:conf opts)))] | |
(println "Options are:") | |
(clojure.pprint/pprint conf) | |
(trap "INT" (fn [] (println "Good-bye...") (System/exit 0))) | |
(println "Server now running...") | |
(broker conf))))) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment