Skip to content

Instantly share code, notes, and snippets.

@RutledgePaulV
Last active September 4, 2018 06:48
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save RutledgePaulV/6ddb7b29578de4442cd8d946885c38a8 to your computer and use it in GitHub Desktop.
Save RutledgePaulV/6ddb7b29578de4442cd8d946885c38a8 to your computer and use it in GitHub Desktop.
core-async-pipeline-bug.clj
(require '[clojure.test :refer :all])
(require '[clojure.core.async :as async])
(deftest from-channel-still-consumed-after-to-channel-is-closed
(let [from-counter (atom 0)
xf-counter (atom 0)
from (async/chan)
to (async/chan 1000)
xf (map #(do (Thread/sleep 100)
(swap! xf-counter inc)
%))]
; start the pipeline
(async/pipeline 1 to xf from)
(async/go-loop []
(async/>! from (swap! from-counter inc))
(recur))
; wait a bit, then close the to channel
(async/<!! (async/timeout 1000))
(async/close! to)
; comments on pipeline say that the from
; channel should stop being consumed
; because the to channel was closed
(let [from-snapshot (deref from-counter)
xf-snapshot (deref xf-counter)]
(async/<!! (async/timeout 200))
; this fails
(is (= from-snapshot (deref from-counter)))
(is (= xf-snapshot (deref xf-counter)))
(async/<!! (async/timeout 400))
; this fails (with higher counters)
(is (= from-snapshot (deref from-counter)))
(is (= xf-snapshot (deref xf-counter)))
(async/<!! (async/timeout 600))
; this fails (with higher counters)
(is (= from-snapshot (deref from-counter)))
(is (= xf-snapshot (deref xf-counter))))))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment