Skip to content

Instantly share code, notes, and snippets.

@kballenegger
Created July 19, 2012 02:00
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save kballenegger/36b552afb7e9975d78e9 to your computer and use it in GitHub Desktop.
Save kballenegger/36b552afb7e9975d78e9 to your computer and use it in GitHub Desktop.
(ns cb.broker
(:require clojure.pprint)
(:use [org.zeromq.clojure :as zmq]
[clojure.tools.cli :as cli]
[clj-yaml.core :as yaml])
(:import com.surftools.BeanstalkClient.Client
com.surftools.BeanstalkClientImpl.ClientImpl)
(:gen-class))
(defn trap
"traps a [signal] and executes a [function]"
[sig f]
(sun.misc.Signal/handle
(sun.misc.Signal. sig)
(proxy [sun.misc.SignalHandler] []
(handle [_]
(f)))))
(let [ctx (zmq/make-context 1)]
(let [make-output-sender
(fn [block endpoint]
(let [s (zmq/make-socket ctx zmq/+downstream+)
send (cond block (fn [payload] (zmq/send- s payload))
:else (fn [payload] (zmq/send- s payload zmq/+noblock+)))]
(zmq/bind s endpoint)
send))]
(defn make-persistent-queue
"returns a function which queues a payload in a persistent queue, while simultaneously starting a background thread which processes said queue"
[conf beanstalk-conf]
(.start (Thread. (fn []
(let [b (ClientImpl. (:host beanstalk-conf) (:port beanstalk-conf))
send (make-output-sender false (:endpoint conf))]
(.watch b (:queue conf))
(loop []
(let [job (.reserve b (int (* 60 5)))]
(cond (nil? job) (Thread/sleep 300)
:else (do
(send (.getData job))
(.delete b (.getJobId job))))
(recur)))))))
(let [b (ClientImpl. (:host beanstalk-conf) (:port beanstalk-conf))]
(.useTube b (:queue conf))
(fn [p] (.put b 0 0 (* 60 5) p))))
(defn make-queue-or-doer
"based on an [output configuration] either create a named persistent queue or return a function that executes its argument"
[o beanstalk-conf]
(cond (nil? (:queue o))
(let [send (make-output-sender false (:endpoint o))]
(fn [p] (send p)))
:else (make-persistent-queue o beanstalk-conf))))
(defn make-receiver
"returns a function that listens on a specified [address] and executes a [function]"
[addr]
(let [in-socket (zmq/make-socket ctx zmq/+upstream+)]
(zmq/bind in-socket addr)
(fn [f] (f (zmq/recv in-socket)))))
(defn make-sender
"returns a function that sends a message over the sockets as specified in the [configuration]"
[conf]
(let [pub-socket (zmq/make-socket ctx zmq/+pub+)
out-queues (doall (map #(make-queue-or-doer % (:beanstalk conf)) (:outputs conf)))] ; and persistent queues
(.setHWM pub-socket 100) (zmq/bind pub-socket (:pub conf)) ; NOTE: keeping 100 messages in memory
(fn [payload]
(zmq/send- pub-socket payload zmq/+noblock+)
(let [head-match #(.equals (String. %2 0 (count %1)) %1 )]
(doseq [[conf queue] (map vector (:outputs conf) out-queues)]
(cond (head-match (:filter conf) payload)
(queue payload))))))))
(defn broker
"broker zmq sockets with [configuration]"
[conf]
(let [with-receiver (make-receiver (:input conf))
send (make-sender conf)]
(loop [] ; main loop
(with-receiver #(send %))
(recur))))
(defn -main [& args]
(let [[opts _ help] (cli/cli args
["-c" "--conf" "Configuration file" :default "../config/firehose-broker.yml"]
["--help" "Show this help" :flag true])]
(cond (true? (:help opts)) (println help)
:else (let [conf (yaml/parse-string (slurp (:conf opts)))]
(println "Options are:")
(clojure.pprint/pprint conf)
(trap "INT" (fn [] (println "Good-bye...") (System/exit 0)))
(println "Server now running...")
(broker conf)))))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment