Last active
August 29, 2015 13:57
-
-
Save bmabey/9356141 to your computer and use it in GitHub Desktop.
async challenge from Philipp Haller's presenation http://www.infoq.com/presentations/rx-async using clojure's core.async
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(ns challenge | |
" via http://www.infoq.com/presentations/rx-async | |
Two input streams with the following values: | |
stream1: 7, 1, 0, 2, 3, 1, ... | |
stream2: 0, 7, 0, 4, 6, 5, ... | |
Task: | |
Create a new output stream that | |
• yields, for each value of stream1, the sum of the previous 3 | |
• values of stream1,except if the sum is greater than some threshold in | |
which case the next value of stream2 should be subtracted. | |
For a threshold of 5, the output stream has the following values: | |
output: 7, 1, 8, 3, 5, 2, ... " | |
(:require [clojure.core.async :as async])) | |
(defn example-streams [] | |
(map async/to-chan [[7 1 0 2 3 1] | |
[0 7 0 4 6 5]])) | |
(def example-threshold 5) | |
(def expected-stream [7 1 8 3 5 2]) | |
(defn pure-csp-version | |
"This version doesn't use any of the helper fns (e.g. filter) provided by core.async" | |
[threshold stream1 stream2] | |
(let [result-stream (async/chan)] | |
(async/go-loop [window [0 0 0]] | |
(when-let [val (async/<! stream1)] | |
(let [new-window (->> window (take 2) (cons val)) | |
sum (apply + new-window) | |
result (if (> sum threshold) | |
(async/alt! | |
stream2 ([v] (- sum v)) | |
:default nil) | |
sum)] | |
(when result | |
(async/>! result-stream result) | |
(recur new-window)))) | |
(async/close! result-stream)) | |
result-stream)) | |
(->> (apply pure-csp-version example-threshold (example-streams)) | |
(async/into []) | |
(async/<!!)) ;; => [7 1 8 3 5 2] | |
;; introducing a when-lets macro cleans it up a bit: | |
;; taken from https://github.com/runa-dev/kits/blob/master/src/kits/flowcontrol.clj | |
(defmacro if-lets | |
([bindings expr1] | |
`(if-lets ~bindings | |
~expr1 | |
nil)) | |
([bindings expr1 expr2] | |
(if (= 2 (count bindings)) | |
`(if-let ~bindings | |
~expr1 | |
~expr2) | |
`(if-let ~(vec (take 2 bindings)) | |
(if-lets ~(vec (drop 2 bindings)) | |
~expr1 | |
~expr2) | |
~expr2)))) | |
(defmacro when-lets [bindings & body] | |
`(if-lets ~bindings | |
(do ~@body) | |
nil)) | |
(defn pure-csp-version | |
"This version doesn't use any of the helper fns (e.g. filter) provided by core.async" | |
[threshold stream1 stream2] | |
(let [result-stream (async/chan)] | |
(async/go-loop [window [0 0 0]] | |
(when-lets [val (async/<! stream1) | |
new-window (->> window (take 2) (cons val)) | |
sum (apply + new-window) | |
result (if (> sum threshold) | |
(async/alt! | |
stream2 ([v] (- sum v)) | |
:default nil) | |
sum)] | |
(async/>! result-stream result) | |
(recur new-window)) | |
(async/close! result-stream)) | |
result-stream)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment