Create a gist now

Instantly share code, notes, and snippets.

What would you like to do?
Channels-driven concurrency with Clojure
;; Channels-driven concurrency with Clojure
;; Clojure variant for code examples from this gist:
;; https://gist.github.com/3124594
;; Primarily taken from Rob Pike's talk on Google I/O 2012:
;; http://www.youtube.com/watch?v=f6kdp27TYZs&feature=youtu.be
;;
;; Concurrency is the key to designing high performance network services.
;; Clojure provides several concurrency primitives, like futures/promises, atom, agent etc.
;; There is no implementation for "Go channels" in core, but we can use
;; 3rd-party library Lamina to do the same things.
;;
;; https://github.com/ztellman/lamina
;;
;; I should also mention, that this is not a simple copy of syntax/semantic notations
;; from Go, I tried to create "clojure-like" variant of how to do the same things (with
;; functional approach of data transformation from initial to desired state).
;; (1) Generator: function that returns the channel
(use 'lamina.core)
(defn boring
[name]
(let [ch (channel)]
;; future will run separately from main control flow
(future
;; emit string message five times with random delay
(dotimes [_ 5]
(let [after (int (rand 500))]
(Thread/sleep after)
(enqueue ch (str name ": I'm boring after " after)))))
;; return the channel to caller
ch))
;; With single instance
(let [joe (boring "Joe")]
(doseq [msg (lazy-channel-seq (take* 5 joe))] (println msg)))
(println "You're boring: I'm leaving.")
;; Process all messages from channel
;; Please, note this notation is asynchronous, so...
(let [joe (boring "Joe")] (receive-all joe println))
;; you will see this message first :)
(println "You're boring: I'm leaving.")
;; More instances...
;; Actually, this is little bit tricky and it's definitely other
;; mechanism than we use in Go for this example. It's more
;; likely what we do in "#2 Fan-in" code examples.
(let [joe (boring "Joe") ann (boring "Ann") chs (channel)]
(doseq [ch [joe ann]] (join ch chs))
(receive-all chs println))
(println "You're boring: I'm leaving.")
;; More instances...
;; Read from one channel, than - from second
(let [joe (boring "Joe") ann (boring "Ann")]
(loop []
(doseq [ch [joe ann]]
;; TODO: Fix checking for channel closing (this is wrong way)
(when-not (closed? ch) (println @(read-channel ch))))
(recur)))
;; Read from one channel, than - from second
;; Several improvements in order to stop execution,
;; when both channels are closed (without any information
;; about total count of messages)
(let [joe (boring "Joe") ann (boring "Ann") run (atom 2)]
(loop []
(doseq [ch [joe ann]]
;; TODO: Fix checking for channel closing (this is wrong way)
(if (closed? ch)
(swap! run dec)
(println @(read-channel ch))))
(if (> @run 0) (recur))))
(println "You're boring: I'm leaving.")
;; Channels-driven concurrency with Clojure
;; Clojure variant for code examples from this gist:
;; https://gist.github.com/3124594
;; Primarily taken from Rob Pike's talk on Google I/O 2012:
;; http://www.youtube.com/watch?v=f6kdp27TYZs&feature=youtu.be
;;
;; Concurrency is the key to designing high performance network services.
;; Clojure provides several concurrency primitives, like futures/promises, atom, agent etc.
;; There is no implementation for "Go channels" in core, but we can use
;; 3rd-party library Lamina to do the same things.
;;
;; https://github.com/ztellman/lamina
;;
;; I should also mention, that this is not a simple copy of syntax/semantic notations
;; from Go, I tried to create "clojure-like" variant of how to do the same things (with
;; functional approach of data transformation from initial to desired state).
;; (2) Fan-in
;; "Hand-made" one
(defn fan-in
[input1 input2]
(let [ch (channel) pusher (partial enqueue ch)]
(doseq [x [input1 input2]] (receive-all x pusher)) ch))
;; Or any count of inputs instead of just 2
(defn fan-in
[& inputs]
(let [ch (channel) pusher (partial enqueue ch)]
(doseq [x inputs] (receive-all x pusher)) ch))
;; Or more "clojurian" approach with join
(defn fan-in
[& inputs]
(let [ch (channel)] (doseq [x inputs] (join x ch)) ch))
;; Do printing only 10 times
(let [ch (apply fan-in (map boring ["Joe" "Ann"]))]
(receive-all (take* 10 ch) println))
;; Or any times something will be pushed to channel
(let [ch (apply fan-in (map boring ["Joe" "Ann"]))] (receive-all ch println))
;; Channels-driven concurrency with Clojure
;; Clojure variant for code examples from this gist:
;; https://gist.github.com/3124594
;; Primarily taken from Rob Pike's talk on Google I/O 2012:
;; http://www.youtube.com/watch?v=f6kdp27TYZs&feature=youtu.be
;;
;; Concurrency is the key to designing high performance network services.
;; Clojure provides several concurrency primitives, like futures/promises, atom, agent etc.
;; There is no implementation for "Go channels" in core, but we can use
;; 3rd-party library Lamina to do the same things.
;;
;; https://github.com/ztellman/lamina
;;
;; I should also mention, that this is not a simple copy of syntax/semantic notations
;; from Go, I tried to create "clojure-like" variant of how to do the same things (with
;; functional approach of data transformation from initial to desired state).
;; (3) Select
;; Clojure doesn't have "select" (mostly cause of functional approach),
;; but we can simulate it using map* and case calls
(let [joe (boring "Joe")
;; Will generate messages each 60 ms
timer (periodically 60 (fn [] "You're too slow!"))
;; All channels will be joined with this one
select (channel)]
(doseq
[[t ch] [["joe" joe] ["timer" timer]]]
;; Map message to struct [type message]
;; TODO: Check if I can you (named-channel) for this
(join (map* (partial conj [t]) ch) select))
;; Read from channel until it's not closed (in blocking mode)
(receive-all select
(fn [[name msg]]
(println (str msg
(case name
"joe" " <== Message from Joe"
"timer" " <== Timeout"))))))
;; Channels-driven concurrency with Clojure
;; Clojure variant for code examples from this gist:
;; https://gist.github.com/3124594
;; Primarily taken from Rob Pike's talk on Google I/O 2012:
;; http://www.youtube.com/watch?v=f6kdp27TYZs&feature=youtu.be
;;
;; Concurrency is the key to designing high performance network services.
;; Clojure provides several concurrency primitives, like futures/promises, atom, agent etc.
;; There is no implementation for "Go channels" in core, but we can use
;; 3rd-party library Lamina to do the same things.
;;
;; https://github.com/ztellman/lamina
;;
;; I should also mention, that this is not a simple copy of syntax/semantic notations
;; from Go, I tried to create "clojure-like" variant of how to do the same things (with
;; functional approach of data transformation from initial to desired state).
;; (4) Timeouts
;; To test timeouts let add one line into boring generator
(defn boring
[name]
(let [ch (channel)]
;; future will run separately from main control flow
(future
;; emit string message five times with random delay
(dotimes [_ 5]
(let [after (int (rand 100))]
(Thread/sleep after)
(enqueue ch (str name ": I'm boring after " after))))
(close ch)) ;; <--- Here. Let's close channel after 5 messages.
;; return the channel to caller
ch))
;; Timeout for whole conversation
(let [ch (apply fan-in (map boring ["Joe" "Ann"]))]
;; note 3rd param for lazy-channel-seq function
(doseq [msg (lazy-channel-seq (take* 10 ch) 500)] (println msg)))
(println "You're boring: I'm leaving.")
;; Timeout for each message
;; There are multiple ways of how to perform such functionality.
;; Here you can find approach using (with-timeout _) wrapper for ResultChannel.
(let [ch (apply fan-in (map boring ["Joe" "Ann"]))]
(loop []
;; TODO: Fix checking for channel closing (this is wrong way)
(when-not (closed? ch)
(println @(with-timeout 50 (read-channel ch))) (recur))))
;; Channels-driven concurrency with Clojure
;; Clojure variant for code examples from this gist:
;; https://gist.github.com/3124594
;; Primarily taken from Rob Pike's talk on Google I/O 2012:
;; http://www.youtube.com/watch?v=f6kdp27TYZs&feature=youtu.be
;;
;; Concurrency is the key to designing high performance network services.
;; Clojure provides several concurrency primitives, like futures/promises, atom, agent etc.
;; There is no implementation for "Go channels" in core, but we can use
;; 3rd-party library Lamina to do the same things.
;;
;; https://github.com/ztellman/lamina
;;
;; I should also mention, that this is not a simple copy of syntax/semantic notations
;; from Go, I tried to create "clojure-like" variant of how to do the same things (with
;; functional approach of data transformation from initial to desired state).
;; (7) Fake Google search
(defn fake-search
[kind]
(fn [query ch]
;; Fake seach will work in async mode and will push result to channel
(future
(Thread/sleep (int (rand 100)))
(enqueue ch (str kind " result for " query)) (close ch))
ch))
(defn fan-in
[& inputs]
(let [ch (channel)]
(doseq [x inputs] (join x ch)) ch))
(defn fastest
[query & replicas]
(let [chs (map #(% query (channel)) replicas)
ch (apply fan-in chs)]
@(read-channel ch)))
(defn google
[query]
(println "Start searching")
(let [ch (channel)]
(doseq [s '(["Web1" "Web2"]
["Image1" "Image2"]
["Video1" "Video2"])]
(future
(enqueue ch
(apply fastest (conj (map fake-search s) query)))))
(time ;; This macro will check elapsed time for calculations
(doseq [msg (lazy-channel-seq (take* 3 ch) 80)]
(println msg)))))
;; Output result for calling
;; (google "Go & Clojure channels")
;;
;; Start searching
;; Video1 result for Go & Clojure channels
;; Image2 result for Go & Clojure channels
;; Web2 result for Go & Clojure channels
;; "Elapsed time: 74.172 msecs"
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment