Skip to content

Instantly share code, notes, and snippets.

Created March 2, 2015 08:35
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save anonymous/b639bddfe1f7ea646b1f to your computer and use it in GitHub Desktop.
Save anonymous/b639bddfe1f7ea646b1f to your computer and use it in GitHub Desktop.
(sm/defn amq [] {})
(sm/defn amq-start [this]
(info "AsyncMemQueue start")
(assoc this
:publisher (async/chan 1024)
:publication (async/pub (:publisher this) first)))
;; -----------------------------------------------------------------------------
(sm/defn amp ^:always-validate
[config :- {(s/required-key :topic) String}]
config)
;; [queue topic]
(sm/defn amp-start [this]
(info "AsyncMemProducer start")
(assert (get-in this [:queue :publisher]) "queue has no publisher")
this)
(sm/defn amp-send-message [this msg]
;; XXX this can be source of deadlock, if buffer isn't big enough;
;; could use async/go around it instead.
(async/>!! (get-in this [:queue :publisher]) [(:topic this) msg]))
;; -----------------------------------------------------------------------------
(sm/defn amc ^:always-validate
[config :- {(s/required-key :topic) String}]
config)
;; [queue topic subscriber]
(sm/defn amc-start [this]
(info "AsyncMemConsumer start")
(assert (get-in this [:queue :publisher]) "queue has no publisher")
(assert (get-in this [:queue :publication]) "queue has no publication")
(let [s (async/chan 1024)
this (assoc this :subscriber s)]
(async/sub (get-in this [:queue :publication]) (:topic this) s)
this))
(sm/defn amc-recv-message [this]
(async/<!! (:subscriber this)))
;; =============================================================================
(def q (amq-start (amq)))
(def p (amp-start (merge {:queue q} (amp {:topic "#topic"}))))
(def c (amc-start (merge {:queue q} (amc {:topic "#topic"}))))
(amp-send-message p "foo")
(amc-recv-message c)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment