-
-
Save anonymous/b639bddfe1f7ea646b1f to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(sm/defn amq [] {}) | |
(sm/defn amq-start [this] | |
(info "AsyncMemQueue start") | |
(assoc this | |
:publisher (async/chan 1024) | |
:publication (async/pub (:publisher this) first))) | |
;; ----------------------------------------------------------------------------- | |
(sm/defn amp ^:always-validate | |
[config :- {(s/required-key :topic) String}] | |
config) | |
;; [queue topic] | |
(sm/defn amp-start [this] | |
(info "AsyncMemProducer start") | |
(assert (get-in this [:queue :publisher]) "queue has no publisher") | |
this) | |
(sm/defn amp-send-message [this msg] | |
;; XXX this can be source of deadlock, if buffer isn't big enough; | |
;; could use async/go around it instead. | |
(async/>!! (get-in this [:queue :publisher]) [(:topic this) msg])) | |
;; ----------------------------------------------------------------------------- | |
(sm/defn amc ^:always-validate | |
[config :- {(s/required-key :topic) String}] | |
config) | |
;; [queue topic subscriber] | |
(sm/defn amc-start [this] | |
(info "AsyncMemConsumer start") | |
(assert (get-in this [:queue :publisher]) "queue has no publisher") | |
(assert (get-in this [:queue :publication]) "queue has no publication") | |
(let [s (async/chan 1024) | |
this (assoc this :subscriber s)] | |
(async/sub (get-in this [:queue :publication]) (:topic this) s) | |
this)) | |
(sm/defn amc-recv-message [this] | |
(async/<!! (:subscriber this))) | |
;; ============================================================================= | |
(def q (amq-start (amq))) | |
(def p (amp-start (merge {:queue q} (amp {:topic "#topic"})))) | |
(def c (amc-start (merge {:queue q} (amc {:topic "#topic"})))) | |
(amp-send-message p "foo") | |
(amc-recv-message c) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment