Last active
August 29, 2015 14:27
-
-
Save ajchemist/65d49329037d1ce9b4ce to your computer and use it in GitHub Desktop.
core.async
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#_(boot.core/set-env! | |
:dependencies | |
'[[org.clojure/core.async "0.1.346.0-17112a-alpha" :scope "provided"]]) | |
;; https://github.com/clojure/core.async/wiki/Pub-Sub | |
(ns async | |
(:refer-clojure :exclude [reduce map into partition partition-by merge take]) | |
(:use clojure.core.async)) | |
;; Publishing and subscribing is an oft-used metaphor to manage asynchronous events. | |
;; The pub function takes a channel and a topic function and returns a publication: | |
(def input-chan (chan)) | |
(def our-pub (pub input-chan :msg-type)) | |
;; A publication is not a channel: | |
#_(put! our-pub {:msg-type :greeting :text "hello"}) | |
; => IllegalArgumentException No implementation of method put! found for class: clojure.core.async$pub$reify | |
;; Instead, we just push messages to the original channel: | |
(>!! input-chan {:msg-type :greeting :text "hello"}) | |
;; Normally, with nothing drawing from input-chan, this would block, but the | |
;; implementation of pub creates two go blocks that will valiantly sacrifice | |
;; themselves and hang so the main thread doesn't have to, acting like size 1 | |
;; buffers. This is an implementation detail, however, and should not be relied | |
;; upon. It's good to know the underlying workings, especially if you're playing | |
;; around experimenting with buffers. | |
;; The topic-fn is applied to all values that are passed to the publication. In the | |
;; above, :msg-type is being used as a fn, and it will return :greeting. The | |
;; topic-fn is used to categorize values, not modify them, a publication with inc as | |
;; its topic function would pass the values unchanged. | |
;; How do we receive messages? Here the sub function comes into play. It's called like so: | |
(def output-chan (chan)) | |
(sub our-pub :greeting output-chan) | |
;; sub takes a publication, a topic, and a subscribing channel. The subscribing | |
;; channel will receive all values from the publication for which (= (topic-fn | |
;; value) topic): | |
(go-loop [] | |
(let [{:keys [text]} (<! output-chan)] | |
(println text) | |
(recur))) | |
#_(go (let [{:keys [text]} (<! output-chan)] (println text))) | |
#_(go (let [_ (<! input-chan)] (println (pr-str _)))) | |
;; Notice that although we put a value onto the input-chan earlier, nothing happens | |
;; right after we run this go block. This is because publications will drop values | |
;; that don't match a subscribed topic. Our original message was lost. But now that | |
;; there is a subscribed channel, new input will be published: | |
(>!! input-chan {:msg-type :greeting :text "hi"}) | |
;; The publication will copy the input to as many channels as are subscribed, if we | |
;; create a duplicate channel subscribed to the same topic: | |
(let [c (chan)] | |
(sub our-pub :greeting c) | |
(go-loop [] | |
(let [{:keys [msg-type text]} (<! c)] | |
(println "duplicate:" text ";") | |
(recur)))) | |
;; and then put another message on input-chan, then both channels will receive it: | |
(>!! input-chan {:msg-type :greeting :text "new"}) | |
;; It's worth noting that if a publication tries to pass to a channel that's not | |
;; accepting values, the whole publication will block: | |
(def loser-chan (chan)) | |
(sub our-pub :loser loser-chan) | |
(>!! input-chan {:msg-type :loser :text "I won't be accepted"}) | |
(>!! input-chan {:msg-type :greeting :text "new"}) ; => block thread | |
;; Careful: this will return true and won't block, because the publication is taking | |
;; values from input-chan. But inside the publication, a go block is hanging. The | |
;; >!! of a :loser value won't block the main thread either, but all following will. | |
;; The moral of the story is: if you subscribe a channel to a publication, make sure | |
;; it can accept values! You should be doing this anyway, but it's especially | |
;; important when dealing with publications, because you might not have any inkling | |
;; that something is wrong until some other topic is hanging. | |
;; Sometimes you can't do this---perhaps because the consuming operation is | |
;; resource-intensive. In these cases you can create topic-specific buffers: the pub | |
;; function allows a buf-fn argument that, given a topic, returns a buffer or a | |
;; number, which is then passed internally to chan. Your buffer-fn might even simply | |
;; be a map: | |
{:predict-election 100 | |
:flip-a-coin 3} | |
;; You can get fancier with special buffers and buffer functions if you wish: | |
(def pub-central | |
(let [topic-fn #(case (:msg-type %) | |
:db-change :acid | |
:http-request :stateless) | |
buf-fn #(case % | |
:stateless (sliding-buffer 10) | |
:acid (dropping-buffer 1000))] | |
(pub request-source topic-fn buf-fn))) | |
;; As always, the documentation and source code are available f01(or your | |
;; perusal. Happy publishing! |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(ns chat.core | |
(:require [cljs.core.async :refer [chan <! >! timeout pub sub unsub unsub-all]]) | |
(:require-macros [cljs.core.async.macros :refer [go]])) | |
(def publisher (chan)) | |
(def publication (pub publisher #(:topic %))) | |
(def subscriber-one (chan)) | |
(def subscriber-two (chan)) | |
(def subscriber-three (chan)) | |
(sub publication :account-created subscriber-one) | |
(sub publication :account-created subscriber-two) | |
(sub publication :user-logged-in subscriber-two) | |
(sub publication :change-page subscriber-three) | |
(defn take-and-print [channel prefix] | |
(go-loop [] | |
(println prefix ": " (<! channel)) | |
(recur))) | |
(take-and-print subscriber-one "subscriber-one") | |
(take-and-print subscriber-two "subscriber-two") | |
(take-and-print subscriber-three "subscriber-three") | |
(go (>! publisher { :topic :change-page :dest "/#home" })) | |
(go (>! publisher { :topic :account-created :username "billy" })) | |
(go (>! publisher { :topic :user-logged-in :username "billy" })) | |
(go (>! publisher { :topic :user-logged-out :username "billy" })) | |
(unsub publication :account-created subscriber-two) | |
(go (>! publisher { :topic :account-created :username "billy" })) | |
(unsub-all publication :account-created) | |
(go (>! publisher { :topic :account-created :username "billy" })) | |
(unsub-all publication) | |
(go (>! publisher { :topic :change-page :dest "/#home" })) | |
(go (>! publisher { :topic :account-created :username "billy" })) | |
(go (>! publisher { :topic :user-logged-in :username "billy" })) | |
(go (>! publisher { :topic :user-logged-out :username "billy" })) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment