Skip to content

Instantly share code, notes, and snippets.

@ajchemist
Last active August 29, 2015 14:27
Show Gist options
  • Save ajchemist/65d49329037d1ce9b4ce to your computer and use it in GitHub Desktop.
Save ajchemist/65d49329037d1ce9b4ce to your computer and use it in GitHub Desktop.
core.async
#_(boot.core/set-env!
:dependencies
'[[org.clojure/core.async "0.1.346.0-17112a-alpha" :scope "provided"]])
;; https://github.com/clojure/core.async/wiki/Pub-Sub
(ns async
(:refer-clojure :exclude [reduce map into partition partition-by merge take])
(:use clojure.core.async))
;; Publishing and subscribing is an oft-used metaphor to manage asynchronous events.
;; The pub function takes a channel and a topic function and returns a publication:
(def input-chan (chan))
(def our-pub (pub input-chan :msg-type))
;; A publication is not a channel:
#_(put! our-pub {:msg-type :greeting :text "hello"})
; => IllegalArgumentException No implementation of method put! found for class: clojure.core.async$pub$reify
;; Instead, we just push messages to the original channel:
(>!! input-chan {:msg-type :greeting :text "hello"})
;; Normally, with nothing drawing from input-chan, this would block, but the
;; implementation of pub creates two go blocks that will valiantly sacrifice
;; themselves and hang so the main thread doesn't have to, acting like size 1
;; buffers. This is an implementation detail, however, and should not be relied
;; upon. It's good to know the underlying workings, especially if you're playing
;; around experimenting with buffers.
;; The topic-fn is applied to all values that are passed to the publication. In the
;; above, :msg-type is being used as a fn, and it will return :greeting. The
;; topic-fn is used to categorize values, not modify them, a publication with inc as
;; its topic function would pass the values unchanged.
;; How do we receive messages? Here the sub function comes into play. It's called like so:
(def output-chan (chan))
(sub our-pub :greeting output-chan)
;; sub takes a publication, a topic, and a subscribing channel. The subscribing
;; channel will receive all values from the publication for which (= (topic-fn
;; value) topic):
(go-loop []
(let [{:keys [text]} (<! output-chan)]
(println text)
(recur)))
#_(go (let [{:keys [text]} (<! output-chan)] (println text)))
#_(go (let [_ (<! input-chan)] (println (pr-str _))))
;; Notice that although we put a value onto the input-chan earlier, nothing happens
;; right after we run this go block. This is because publications will drop values
;; that don't match a subscribed topic. Our original message was lost. But now that
;; there is a subscribed channel, new input will be published:
(>!! input-chan {:msg-type :greeting :text "hi"})
;; The publication will copy the input to as many channels as are subscribed, if we
;; create a duplicate channel subscribed to the same topic:
(let [c (chan)]
(sub our-pub :greeting c)
(go-loop []
(let [{:keys [msg-type text]} (<! c)]
(println "duplicate:" text ";")
(recur))))
;; and then put another message on input-chan, then both channels will receive it:
(>!! input-chan {:msg-type :greeting :text "new"})
;; It's worth noting that if a publication tries to pass to a channel that's not
;; accepting values, the whole publication will block:
(def loser-chan (chan))
(sub our-pub :loser loser-chan)
(>!! input-chan {:msg-type :loser :text "I won't be accepted"})
(>!! input-chan {:msg-type :greeting :text "new"}) ; => block thread
;; Careful: this will return true and won't block, because the publication is taking
;; values from input-chan. But inside the publication, a go block is hanging. The
;; >!! of a :loser value won't block the main thread either, but all following will.
;; The moral of the story is: if you subscribe a channel to a publication, make sure
;; it can accept values! You should be doing this anyway, but it's especially
;; important when dealing with publications, because you might not have any inkling
;; that something is wrong until some other topic is hanging.
;; Sometimes you can't do this---perhaps because the consuming operation is
;; resource-intensive. In these cases you can create topic-specific buffers: the pub
;; function allows a buf-fn argument that, given a topic, returns a buffer or a
;; number, which is then passed internally to chan. Your buffer-fn might even simply
;; be a map:
{:predict-election 100
:flip-a-coin 3}
;; You can get fancier with special buffers and buffer functions if you wish:
(def pub-central
(let [topic-fn #(case (:msg-type %)
:db-change :acid
:http-request :stateless)
buf-fn #(case %
:stateless (sliding-buffer 10)
:acid (dropping-buffer 1000))]
(pub request-source topic-fn buf-fn)))
;; As always, the documentation and source code are available f01(or your
;; perusal. Happy publishing!
(ns chat.core
(:require [cljs.core.async :refer [chan <! >! timeout pub sub unsub unsub-all]])
(:require-macros [cljs.core.async.macros :refer [go]]))
(def publisher (chan))
(def publication (pub publisher #(:topic %)))
(def subscriber-one (chan))
(def subscriber-two (chan))
(def subscriber-three (chan))
(sub publication :account-created subscriber-one)
(sub publication :account-created subscriber-two)
(sub publication :user-logged-in subscriber-two)
(sub publication :change-page subscriber-three)
(defn take-and-print [channel prefix]
(go-loop []
(println prefix ": " (<! channel))
(recur)))
(take-and-print subscriber-one "subscriber-one")
(take-and-print subscriber-two "subscriber-two")
(take-and-print subscriber-three "subscriber-three")
(go (>! publisher { :topic :change-page :dest "/#home" }))
(go (>! publisher { :topic :account-created :username "billy" }))
(go (>! publisher { :topic :user-logged-in :username "billy" }))
(go (>! publisher { :topic :user-logged-out :username "billy" }))
(unsub publication :account-created subscriber-two)
(go (>! publisher { :topic :account-created :username "billy" }))
(unsub-all publication :account-created)
(go (>! publisher { :topic :account-created :username "billy" }))
(unsub-all publication)
(go (>! publisher { :topic :change-page :dest "/#home" }))
(go (>! publisher { :topic :account-created :username "billy" }))
(go (>! publisher { :topic :user-logged-in :username "billy" }))
(go (>! publisher { :topic :user-logged-out :username "billy" }))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment