Skip to content

Instantly share code, notes, and snippets.

@jeroenvandijk
Last active October 5, 2016 10:39
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jeroenvandijk/a753b5945b6cd1f8fa8811e4c393f974 to your computer and use it in GitHub Desktop.
Save jeroenvandijk/a753b5945b6cd1f8fa8811e4c393f974 to your computer and use it in GitHub Desktop.
Manifold buffer stream issue
(ns manifold-test
(:require [midje.sweet :refer :all]
[manifold.stream :as s]))
;; Observed with [manifold "0.1.6-alpha1"] and [org.clojure/clojure "1.8.0"] and Java 8
;; Used [midje "1.8.3"] for testing
(facts "about manifold.stream/buffered-stream"
(fact "Issue 1: goes over capacity when manifold.stream/put doesn't respect back pressure"
(let [s (s/buffered-stream 10)
f (future
(dotimes [i 100]
(s/put! s i)))]
;; Wait a bit to fill up the stream
(Thread/sleep 100)
(s/description s) => {:buffer-capacity 10, :buffer-size 100, :closed? false, :drained? false, :pending-puts 0, :pending-takes 0, :permanent? false, :sink? true, :source? true, :type "manifold"}
;; Cleanup
(future-cancel f)))
(fact "Issue 2: stream goes over capacity when manifold.stream/try-put doesn't respect back pressure"
(let [s (s/buffered-stream 10)
f (future
(dotimes [i 100]
(s/try-put! s i 100 :timeout)))]
;; Wait a bit to fill up the stream
(Thread/sleep 100)
(s/description s) => {:buffer-capacity 10, :buffer-size 100, :closed? false, :drained? false, :pending-puts 0, :pending-takes 0, :permanent? false, :sink? true, :source? true, :type "manifold"}
;; Cleanup
(future-cancel f)))
(fact "doesn't go over capacity when manifold.stream/put respects back pressure"
(let [s (s/buffered-stream 10)
f (future
(dotimes [i 100]
@(s/put! s i)))]
;; Wait a bit to fill up the stream
(Thread/sleep 100)
(s/description s) => {:buffer-capacity 10, :buffer-size 11 :closed? false, :drained? false, :pending-puts 0, :pending-takes 0, :permanent? false, :sink? true, :source? true, :type "manifold"}
;; Cleanup
(future-cancel f)))
(fact "doesn't go over capacity when manifold.stream/try-put respects back pressure"
(let [s (s/buffered-stream 10)
f (future
(dotimes [i 100]
@(s/try-put! s i 100 :timeout)))]
;; Wait a bit to fill up the stream
(Thread/sleep 100)
(s/description s) => {:buffer-capacity 10, :buffer-size 11 :closed? false, :drained? false, :pending-puts 0, :pending-takes 0, :permanent? false, :sink? true, :source? true, :type "manifold"}
;; Cleanup
(future-cancel f))))
(facts "about manifold.stream/stream"
(fact "doesn't go over capacity when manifold.stream/put doesn't respect back pressure"
(let [s (s/stream 10)
f (future
(dotimes [i 100]
(s/put! s i)))]
;; Wait a bit to fill up the stream
(Thread/sleep 100)
(s/description s) => {:buffer-capacity 10, :buffer-size 10, :closed? false, :drained? false, :pending-puts 90, :pending-takes 0, :permanent? false, :sink? true, :source? true, :type "manifold"}
;; Cleanup
(future-cancel f))))
(fact "implementation of manifold.bus/event-bus isn't really respecting backpressure of manifold.stream/buffered-stream"
;; https://github.com/ztellman/manifold/blob/d2570579f972a6860a4661dba3053cf9d209f1a4/src/manifold/bus.clj#L119
(let [s (s/buffered-stream 10)]
(-> (apply d/zip' (map #(s/put! s %) (range 100)))
(d/chain' (fn [_] true))
(d/timeout! 100 :timeout)
deref)
=> :timeout
(s/description s) => {:buffer-capacity 10, :buffer-size 100, :closed? false, :drained? false, :pending-puts 0, :pending-takes 0, :permanent? false, :sink? true, :source? true, :type "manifold"}))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment