This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(ns clojure-async-example.core | |
(:require [clojure.core.async :as async :refer [chan dropping-buffer sliding-buffer | |
go go-loop thread | |
<! >! >!! <!! take! | |
alts! alts!! | |
timeout | |
close!]])) | |
;; core.async: | |
;; implementation of CSP (like Go lang) | |
;; works in ClojureScript (awesome!) | |
;; similar to Actor model, but with important differences | |
;; CHANNELS | |
;; most basic primitive of core.async | |
(def ch (chan)) | |
;; blocking put to channel | |
(>!! ch 42) | |
;; blocking read from channel | |
(<!! ch) | |
;; close channel | |
(close! ch) | |
;; closed channel ignores puts | |
;; closed channel returns nil on reads | |
;; channels works with future (native threads) | |
(let [fchan (chan)] | |
(future (println "got" (<!! fchan))) | |
(future (>!! fchan 42))) | |
;; core.async/thread - future which returs channel! | |
(println "got" (<!! (thread 42))) | |
;; GO BLOCKS | |
(<!! (go 42)) | |
;; go block is a macro which implements IOC to translate | |
;; synchronous code into callback state-machine. | |
;; go blocks returns channels too | |
(clojure.pprint/pprint (macroexpand '(go 42))) | |
(<!! (go (<! (go 42)))) | |
;; <! and >! are non-blocking equivalents of <!! and >!! | |
;; use them in go blocks | |
;; use <!! and >!! in threads | |
;; BUFFERS | |
(def nbc (chan)) ;; same as (def nbc (chan 1)) | |
(go (println "got" (<! nbc))) | |
(go (println "got" (<! nbc))) | |
(>!! nbc 42) | |
(>!! nbc 43) | |
;; dropping buffer (drop newest) | |
(def dropping-channel (chan (dropping-buffer 5))) | |
;; sliding buffer (drop oldest) | |
(def dropping-channel (chan (sliding-buffer 5))) | |
;; TIMEOUTS | |
;; timeouts are channels (awesome!) | |
(<!! (timeout 1000)) | |
;; DOM animation in ClojureScript | |
(go-loop [x 100] | |
(set-element-position ($ "#animated-stuff") x 50) | |
(<! (timeout 50)) | |
(when (> x 0) (recur [(dec x)]))) | |
;; when timeout interval is passed, timeout channel will close | |
;; and you'll got nil | |
;; ALTS | |
;; alts! will read first available value from many channels | |
(let [ch (chan) | |
ch2 (chan)] | |
(go (println "got" (<! (go (alts! [ch ch2]))))) | |
(>!! ch2 42)) | |
;; alts are very useful with timeouts | |
(let [ch (chan)] | |
(go (println "got" (<! (go (alts! [ch (timeout 3000)])))))) | |
;; default value support | |
(alts! [ch1 ch2] :default :nothing-found) | |
;; Go-lang example translated to Clojure | |
;; (from core.async examples) | |
(defn google [query] | |
(let [c (chan) | |
t (timeout 80)] | |
(go (>! c (<! (fastest query web1 web2)))) | |
(go (>! c (<! (fastest query image1 image2)))) | |
(go (>! c (<! (fastest query video1 video2)))) | |
(go (loop [i 0 ret []] | |
(if (= i 3) | |
ret | |
(recur (inc i) (conj ret (alt! [c t] ([v] v))))))))) | |
(<!! (google "clojure")) | |
;; CHANNELS COMPOSITION | |
;; merge, mix - many-to-one | |
(let [ch1 (chan) | |
ch2 (chan) | |
mergedch (async/merge [ch1 ch2] 5)] | |
(>!! ch1 :foo) | |
(>!! ch2 :bar) | |
(println "got" (<!! mergedch)) | |
(println "got" (<!! mergedch))) | |
;; mix is same, but allow to dynamically add/remove sources, | |
;; mute, pause, or solo-ing them | |
(let [ch1 (chan) | |
ch2 (chan) | |
mixed (chan 5) | |
mixer (async/mix mixed)] | |
(async/admix mixer ch1) | |
(async/admix mixer ch2) | |
(>!! ch1 :foo) | |
(>!! ch2 :bar) | |
(println "got" (<!! mixed)) | |
(println "got" (<!! mixed))) | |
;; mult - one-to-many broadcasting | |
(let [src (chan) | |
mult (async/mult src) | |
outch (chan 5)] | |
(async/tap mult outch) | |
(>!! src :foo) | |
(println "got" (<!! outch))) | |
;; pub/sub | |
;; same as mult, but with multimethod-like dispatching | |
(let [src (chan) | |
pub (async/pub src :type)] | |
(let [c (chan)] | |
(async/sub pub :foo c) | |
(go (println "foo subscriber got" (<! c)))) | |
(let [c (chan)] | |
(async/sub pub :bar c) | |
(go (println "bar subscriber got" (<! c)))) | |
(>!! src {:type :foo :payload [42]}) | |
(<!! (timeout 500)) | |
(>!! src {:type :bar :payload [43]})) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment