Skip to content

Instantly share code, notes, and snippets.

@kballenegger
Created August 10, 2012 22:27
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save kballenegger/1278d12a6792b7a2ff7a to your computer and use it in GitHub Desktop.
Save kballenegger/1278d12a6792b7a2ff7a to your computer and use it in GitHub Desktop.
(ns cb.broker
(:use [org.zeromq.clojure :as zmq]
)
(:require com.surftools.BeanstalkClient)
(:import com.surftools.BeanstalkClient.ClientImpl)
(:gen-class))
(defn trap
"traps a [signal] and executes a [function]"
[sig f]
(sun.misc.Signal/handle
(sun.misc.Signal. sig)
(proxy [sun.misc.SignalHandler] []
(handle [_]
(f)))))
(let [ctx (zmq/make-context 1)]
(let [make-output-sender
(fn [block endpoint]
(let [s (zmq/make-socket ctx zmq/+downstream+)
send (cond block (fn [payload] (zmq/send- s payload))
:else (fn [payload] (zmq/send- s payload zmq/+noblock+)))]
(zmq/bind s endpoint)
send))]
(defn make-persistent-queue
"returns a function which queues a payload in a persistent queue, while simultaneously starting a background thread which processes said queue"
[conf beanstalk-conf]
(.start (Thread. (fn []
(let [b (ClientImpl. (:host beanstalk-conf) (:port beanstalk-conf) (:queue conf))
send (make-output-sender false (:endpoint conf))]
(.watch b (:queue conf))
(loop [job (.reserve b (* 60 5))]
(send (.getData job))
(.delete b (.getJobId job))
(recur))))))
(let [b (ClientImpl. (:host beanstalk-conf) (:port beanstalk-conf))]
(.useTube b (:queue conf))
(fn [p] (.put b 0 0 (* 60 5) p))))
(defn make-queue-or-doer
"based on an [output configuration] either create a named persistent queue or return a function that executes its argument"
[o beanstalk-conf]
(cond (nil? (:queue o))
(let [send (make-output-sender false (:endpoint o))]
(fn [p] (send p)))
:else (make-persistent-queue o beanstalk-conf))))
(defn make-receiver
"returns a function that listens on a specified [address] and executes a [function]"
[addr]
(let [in-socket (zmq/make-socket ctx zmq/+upstream+)]
(zmq/bind in-socket addr)
(fn [f] (f (zmq/recv in-socket)))))
(defn make-sender
"returns a function that sends a message over the sockets as specified in the [configuration]"
[conf]
(let [pub-socket (zmq/make-socket ctx zmq/+pub+)
out-queues (map #(make-queue-or-doer % (:beanstalk conf)) (:outputs conf))] ; and persistent queues
(.setHWM pub-socket 100) (zmq/bind pub-socket (:pub conf)) ; NOTE: keeping 100 messages in memory
(fn [payload]
(zmq/send- pub-socket payload zmq/+noblock+)
(let [head-match #(.equals (String. %2 0 (count %1)) %1 )]
(doseq [[conf queue] (map vector (:outputs conf) out-queues)]
(cond (head-match (:filter conf) payload)
(queue payload))))))))
(defn broker
"broker zmq sockets with [configuration]"
[conf]
(let [with-receiver (make-receiver (:input conf))
send (make-sender conf)]
(loop [] ; main loop
(let []
(with-receiver #(send %))
(recur)))))
(defn -main []
(trap "INT" (fn [] (println "Good-bye...") (System/exit 0)))
(broker {:input "tcp://0.0.0.0:13004"
:pub "tcp://0.0.0.0:13005"
:outputs [{:filter "pack" :endpoint "tcp://0.0.0.0:13006" :queue "some-id"}]
:beanstalk {:host "127.0.0.1" :port 12345}}))
[eye ~/Dropbox/dev/caffeine/server/jvm•jzmq]$ lein run -m cb.broker
Exception in thread "main" java.lang.ClassNotFoundException: cb.broker
at java.net.URLClassLoader$1.run(URLClassLoader.java:202)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:190)
at clojure.lang.DynamicClassLoader.findClass(DynamicClassLoader.java:61)
at java.lang.ClassLoader.loadClass(ClassLoader.java:306)
at java.lang.ClassLoader.loadClass(ClassLoader.java:247)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:247)
at clojure.lang.RT.classForName(RT.java:2039)
at clojure.lang.Reflector.invokeStaticMethod(Reflector.java:199)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
at java.lang.reflect.Method.invoke(Method.java:597)
at clojure.lang.Reflector.invokeMatchingMethod(Reflector.java:93)
at clojure.lang.Reflector.invokeStaticMethod(Reflector.java:207)
at user$eval56.invoke(NO_SOURCE_FILE:1)
at clojure.lang.Compiler.eval(Compiler.java:6511)
at clojure.lang.Compiler.eval(Compiler.java:6501)
at clojure.lang.Compiler.eval(Compiler.java:6477)
at clojure.core$eval.invoke(core.clj:2797)
at clojure.main$eval_opt.invoke(main.clj:297)
at clojure.main$initialize.invoke(main.clj:316)
at clojure.main$null_opt.invoke(main.clj:349)
at clojure.main$main.doInvoke(main.clj:427)
at clojure.lang.RestFn.invoke(RestFn.java:421)
at clojure.lang.Var.invoke(Var.java:419)
at clojure.lang.AFn.applyToHelper(AFn.java:163)
at clojure.lang.Var.applyTo(Var.java:532)
at clojure.main.main(main.java:37)
(defproject jvm "1.0.0-SNAPSHOT"
:source-path "src/clj"
:description "FIXME: write description"
:extra-classpath-dirs ["/usr/local/share/java/zmq.jar" "/usr/share/java/zmq.jar"]
:jvm-opts ["-Djava.library.path=/usr/local/lib:/usr/lib:/usr/lib64:/usr/local/lib64"]
:dependencies [[org.clojure/clojure "1.4.0"]
[org.clojars.mikejs/clojure-zmq "2.0.7-SNAPSHOT"]
;[com.github.drsnyder/beanstalk "1.0.0-SNAPSHOT"]
[com.surftools/BeanstalkClient "1.4.6"]])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment