Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
async challenge from Philipp Haller's presenation http://www.infoq.com/presentations/rx-async using clojure's core.async
(ns challenge
" via http://www.infoq.com/presentations/rx-async
Two input streams with the following values:
stream1: 7, 1, 0, 2, 3, 1, ...
stream2: 0, 7, 0, 4, 6, 5, ...
Task:
Create a new output stream that
• yields, for each value of stream1, the sum of the previous 3
• values of stream1,except if the sum is greater than some threshold in
which case the next value of stream2 should be subtracted.
For a threshold of 5, the output stream has the following values:
output: 7, 1, 8, 3, 5, 2, ... "
(:require [clojure.core.async :as async]))
(defn example-streams []
(map async/to-chan [[7 1 0 2 3 1]
[0 7 0 4 6 5]]))
(def example-threshold 5)
(def expected-stream [7 1 8 3 5 2])
(defn pure-csp-version
"This version doesn't use any of the helper fns (e.g. filter) provided by core.async"
[threshold stream1 stream2]
(let [result-stream (async/chan)]
(async/go-loop [window [0 0 0]]
(when-let [val (async/<! stream1)]
(let [new-window (->> window (take 2) (cons val))
sum (apply + new-window)
result (if (> sum threshold)
(async/alt!
stream2 ([v] (- sum v))
:default nil)
sum)]
(when result
(async/>! result-stream result)
(recur new-window))))
(async/close! result-stream))
result-stream))
(->> (apply pure-csp-version example-threshold (example-streams))
(async/into [])
(async/<!!)) ;; => [7 1 8 3 5 2]
;; introducing a when-lets macro cleans it up a bit:
;; taken from https://github.com/runa-dev/kits/blob/master/src/kits/flowcontrol.clj
(defmacro if-lets
([bindings expr1]
`(if-lets ~bindings
~expr1
nil))
([bindings expr1 expr2]
(if (= 2 (count bindings))
`(if-let ~bindings
~expr1
~expr2)
`(if-let ~(vec (take 2 bindings))
(if-lets ~(vec (drop 2 bindings))
~expr1
~expr2)
~expr2))))
(defmacro when-lets [bindings & body]
`(if-lets ~bindings
(do ~@body)
nil))
(defn pure-csp-version
"This version doesn't use any of the helper fns (e.g. filter) provided by core.async"
[threshold stream1 stream2]
(let [result-stream (async/chan)]
(async/go-loop [window [0 0 0]]
(when-lets [val (async/<! stream1)
new-window (->> window (take 2) (cons val))
sum (apply + new-window)
result (if (> sum threshold)
(async/alt!
stream2 ([v] (- sum v))
:default nil)
sum)]
(async/>! result-stream result)
(recur new-window))
(async/close! result-stream))
result-stream))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.