Skip to content

Instantly share code, notes, and snippets.

@mlapshin
Created September 12, 2014 12:07
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mlapshin/c3054c626a1280ec0381 to your computer and use it in GitHub Desktop.
Save mlapshin/c3054c626a1280ec0381 to your computer and use it in GitHub Desktop.
(ns clojure-async-example.core
(:require [clojure.core.async :as async :refer [chan dropping-buffer sliding-buffer
go go-loop thread
<! >! >!! <!! take!
alts! alts!!
timeout
close!]]))
;; core.async:
;; implementation of CSP (like Go lang)
;; works in ClojureScript (awesome!)
;; similar to Actor model, but with important differences
;; CHANNELS
;; most basic primitive of core.async
(def ch (chan))
;; blocking put to channel
(>!! ch 42)
;; blocking read from channel
(<!! ch)
;; close channel
(close! ch)
;; closed channel ignores puts
;; closed channel returns nil on reads
;; channels works with future (native threads)
(let [fchan (chan)]
(future (println "got" (<!! fchan)))
(future (>!! fchan 42)))
;; core.async/thread - future which returs channel!
(println "got" (<!! (thread 42)))
;; GO BLOCKS
(<!! (go 42))
;; go block is a macro which implements IOC to translate
;; synchronous code into callback state-machine.
;; go blocks returns channels too
(clojure.pprint/pprint (macroexpand '(go 42)))
(<!! (go (<! (go 42))))
;; <! and >! are non-blocking equivalents of <!! and >!!
;; use them in go blocks
;; use <!! and >!! in threads
;; BUFFERS
(def nbc (chan)) ;; same as (def nbc (chan 1))
(go (println "got" (<! nbc)))
(go (println "got" (<! nbc)))
(>!! nbc 42)
(>!! nbc 43)
;; dropping buffer (drop newest)
(def dropping-channel (chan (dropping-buffer 5)))
;; sliding buffer (drop oldest)
(def dropping-channel (chan (sliding-buffer 5)))
;; TIMEOUTS
;; timeouts are channels (awesome!)
(<!! (timeout 1000))
;; DOM animation in ClojureScript
(go-loop [x 100]
(set-element-position ($ "#animated-stuff") x 50)
(<! (timeout 50))
(when (> x 0) (recur [(dec x)])))
;; when timeout interval is passed, timeout channel will close
;; and you'll got nil
;; ALTS
;; alts! will read first available value from many channels
(let [ch (chan)
ch2 (chan)]
(go (println "got" (<! (go (alts! [ch ch2])))))
(>!! ch2 42))
;; alts are very useful with timeouts
(let [ch (chan)]
(go (println "got" (<! (go (alts! [ch (timeout 3000)]))))))
;; default value support
(alts! [ch1 ch2] :default :nothing-found)
;; Go-lang example translated to Clojure
;; (from core.async examples)
(defn google [query]
(let [c (chan)
t (timeout 80)]
(go (>! c (<! (fastest query web1 web2))))
(go (>! c (<! (fastest query image1 image2))))
(go (>! c (<! (fastest query video1 video2))))
(go (loop [i 0 ret []]
(if (= i 3)
ret
(recur (inc i) (conj ret (alt! [c t] ([v] v)))))))))
(<!! (google "clojure"))
;; CHANNELS COMPOSITION
;; merge, mix - many-to-one
(let [ch1 (chan)
ch2 (chan)
mergedch (async/merge [ch1 ch2] 5)]
(>!! ch1 :foo)
(>!! ch2 :bar)
(println "got" (<!! mergedch))
(println "got" (<!! mergedch)))
;; mix is same, but allow to dynamically add/remove sources,
;; mute, pause, or solo-ing them
(let [ch1 (chan)
ch2 (chan)
mixed (chan 5)
mixer (async/mix mixed)]
(async/admix mixer ch1)
(async/admix mixer ch2)
(>!! ch1 :foo)
(>!! ch2 :bar)
(println "got" (<!! mixed))
(println "got" (<!! mixed)))
;; mult - one-to-many broadcasting
(let [src (chan)
mult (async/mult src)
outch (chan 5)]
(async/tap mult outch)
(>!! src :foo)
(println "got" (<!! outch)))
;; pub/sub
;; same as mult, but with multimethod-like dispatching
(let [src (chan)
pub (async/pub src :type)]
(let [c (chan)]
(async/sub pub :foo c)
(go (println "foo subscriber got" (<! c))))
(let [c (chan)]
(async/sub pub :bar c)
(go (println "bar subscriber got" (<! c))))
(>!! src {:type :foo :payload [42]})
(<!! (timeout 500))
(>!! src {:type :bar :payload [43]}))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment