[clojure.test :as t :refer [deftest testing is are use-fixtures]]
[prpr.util.test :refer [with-log-level]]
[manifold.deferred :as d]
[ :as s]
[ :as sut]
[taoensso.timbre :refer [debug info warn]])
[ StreamError]))
(deftest s-first-test
(testing "empty stream"
(let [s (s/stream)
_ (s/close! s)]
(is (= ::none @(sut/s-first ::none s)))))
(testing "empty stream default no-val"
(let [s (s/stream)
_ (s/close! s)]
(is (= ::sut/none @(sut/s-first s)))))
(testing "first value"
(let [s (s/stream 3)
_ @(s/put-all! s [:foo :bar :baz])
_ (s/close! s)]
(is (= :foo @(sut/s-first s))))))
(deftest divert-stream-errors-test
(testing "empty source"
(let [s (s/stream)
_ (s/close! s)
[err o] (sut/divert-stream-errors s)]
(is (= [] (s/stream->seq o)))
(is (= [] (s/stream->seq err)))))
(testing "no errors source"
(let [s (s/stream 3)
_ @(s/put-all! s [:foo :bar :baz])
_ (s/close! s)
[err o] (sut/divert-stream-errors s)]
(is (= [:foo :bar :baz] (s/stream->seq o)))
(is (= [] (s/stream->seq err)))))
(testing "all errors source"
(let [s (s/stream 3)
_ @(s/put-all! s [(StreamError.
(ex-info "foo" {:foo :foo}))
(ex-info "bar" {:bar :bar}))
(ex-info "baz" {:baz :baz}))])
_ (s/close! s)
[err o] (sut/divert-stream-errors s)]
;; have to convert err first or it will block
(is (= [{:foo :foo}
{:bar :bar}
{:baz :baz}]
(map (comp ex-data :error) (s/stream->seq err))))
(is (= [] (s/stream->seq o)))))
(testing "mixed source"
(let [s (s/stream 3)
_ @(s/put-all! s [:foo
(ex-info "bar" {:bar :bar}))
_ (s/close! s)
[err o] (sut/divert-stream-errors s)]
;; have to take in the right order or will block
(is (= :foo @(s/take! o)))
(is (= {:bar :bar}
(-> @(s/take! err)
(is (= :baz @(s/take! o)))
(is (= ::closed @(s/take! o ::closed)))
(is (= ::closed @(s/take! err ::closed))))))
(deftest reduce-all-throw-test
(with-log-level :error
(testing "empty source"
(let [s (s/stream)
_ (s/close! s)
r @(sut/reduce-all-throw "empty-source" conj [] s)]
(is (= [] r))))
(testing "no errors source"
(let [s (s/stream 3)
_ @(s/put-all! s [:foo :bar :baz])
_ (s/close! s)
r @(sut/reduce-all-throw "no-errors-source" conj [] s)]
(is (= [:foo :bar :baz] r))))
(testing "all errors source"
(let [s (s/stream 3)
_ @(s/put-all! s [(StreamError.
(ex-info "foo" {:foo :foo}))
(ex-info "bar" {:bar :bar}))
(ex-info "baz" {:baz :baz}))])
_ (s/close! s)
r (sut/reduce-all-throw
ed (ex-data (d/error-value r ::oops))]
(is (= {:foo :foo} ed))))
(testing "mixed source"
(let [s (s/stream 3)
_ @(s/put-all! s [:foo
(ex-info "bar" {:bar :bar}))
_ (s/close! s)
r (sut/reduce-all-throw
ed (ex-data (d/error-value r ::oops))]
(is (= {:bar :bar} ed))))
(testing "no init"
(let [s (s/stream 3)
_ @(s/put-all! s [1 2 3])
_ (s/close! s)
r @(sut/reduce-all-throw
(is (= 6 r))))
(testing "no init error first"
(let [s (s/stream 3)
_ @(s/put-all! s [(StreamError.
(ex-info "bar" {:bar :bar}))
_ (s/close! s)
r (sut/reduce-all-throw
ed (ex-data (d/error-value r ::oops))]
(is (= {:bar :bar} ed))))
(testing "mixed no init"
(let [s (s/stream 3)
_ @(s/put-all! s [1
(ex-info "bar" {:bar :bar}))
_ (s/close! s)
r (sut/reduce-all-throw
ed (ex-data (d/error-value r ::oops))]
(is (= {:bar :bar} ed))))
(testing "throw gets caught"
(let [s (s/stream 1)
ev (d/chain (d/success-deferred :foo)
(fn [v]
(throw (ex-info "boo" {:foo ::foo}))))
_ @(s/put! s ev)
_ (s/close! s)
r (sut/reduce-all-throw
ed (ex-data (d/error-value r ::oops))]
(is (= {:foo ::foo} ed))))
(testing "captures errors during reduction"
(let [processed (atom [])
s (s/stream 8)
_ @(s/put-all! s [0 1 2 3 4 5 6 7])
_ (s/close! s)
r (sut/reduce-all-throw
(fn [rs i]
(swap! processed conj i)
(case i
3 (throw (ex-info (str ::odd-number) {:n i}))
5 (throw (ex-info (str ::odd-number) {:n i}))
#_else (conj rs i)))
rs (d/success-value r ::not-successful)
ed (ex-data (d/error-value r ::no-error))]
(is (= (range 0 8) @processed))
(is (= ::not-successful rs))
(is (= {:n 3} ed))))))
(deftest realize-each-captures-errors-test
(let [xs (->> [(d/future 0)
(sut/->StreamError (ex-info (str ::odd-number) {:n 1}))
(d/success-deferred 3)
(d/error-deferred (ex-info (str ::odd-number) {:n 5}))
(d/future (/ 1 0))]
;; NOTE: using vanilla stream reduce to confirm presence of errors
(is (= [0 2 3 4 6] (remove sut/stream-error? xs)))
(is (= 3 (count (filter sut/stream-error? xs))))
(is (= 1 (->> xs
(filter sut/stream-error?)
(fn [se]
(instance? ArithmeticException (.error se))))
(is (= [1 5]
(->> xs
(filter sut/stream-error?)
(map (comp ex-data #(.error %)))
(remove nil?)
(map :n))))))
(deftest map-captures-errors-test
(let [xs (->> [0 1 2 3 4 5 6 7]
(fn [i]
(case i
1 (sut/->StreamError (ex-info (str ::odd-number) {:n i}))
3 (throw (ex-info (str ::odd-number) {:n i}))
4 (d/success-deferred 4)
5 (d/error-deferred (ex-info (str ::odd-number) {:n i}))
7 (d/future (throw (ex-info (str ::odd-number) {:n i})))
#_else i)))
;; NOTE: using vanilla stream reduce to confirm presence of errors
(is (= [0 2 4 6] (remove sut/stream-error? xs)))
(is (= 4 (count (filter sut/stream-error? xs))))
(is (= [1 3 5 7]
(->> xs
(filter sut/stream-error?)
(map (comp ex-data #(.error %)))
(remove nil?)
(map :n))))))
(deftest map-skips-existing-streamerrors-test
(let [processed (atom [])
xs (->> [0
(sut/->StreamError (ex-info (str ::odd-number) {:n 1}))
(fn [i]
(swap! processed conj i)
(* i 2)))
;; NOTE: using vanilla stream reduce to confirm presence of error
(is (= [0 2] @processed))
(is (= [0 4] (remove sut/stream-error? xs)))
(is (= 1 (count (filter sut/stream-error? xs))))
(is (= [1]
(->> xs
(filter sut/stream-error?)
(map (comp ex-data #(.error %)))
(remove nil?)
(map :n))))))
(deftest mapcat-captures-errors-test
(let [xs (->> [0 1 2 3 4 5]
(fn [i]
(case i
1 [(sut/->StreamError (ex-info (str ::odd-number) {:n i}))]
3 (throw (ex-info (str ::odd-number) {:n i}))
5 [i (* i nil)] ;; generate a NullPointerException
#_else [i (* i 10)])))
;; NOTE: using vanilla stream reduce to confirm presence of errors
(is (= [0 0 2 20 4 40] (remove sut/stream-error? xs)))
(is (= 3 (count (filter sut/stream-error? xs))))
(is (= 1 (->> xs
(filter sut/stream-error?)
(fn [se]
(instance? NullPointerException (.error se))))
(is (= [1 3]
(->> xs
(filter sut/stream-error?)
(map (comp ex-data #(.error %)))
(remove nil?)
(map :n))))))
(deftest mapcat-skips-existing-streamerrors-test
(let [processed (atom [])
xs (->> [0
(sut/->StreamError (ex-info (str ::odd-number) {:n 1}))
(fn [i]
(swap! processed conj i)
[i (* i 10)]))
;; NOTE: using vanilla stream reduce to confirm presence of error
(is (= [0 2] @processed))
(is (= [0 0 2 20] (remove sut/stream-error? xs)))
(is (= 1 (count (filter sut/stream-error? xs))))
(is (= [1]
(->> xs
(filter sut/stream-error?)
(map (comp ex-data #(.error %)))
(remove nil?)
(map :n))))))
(deftest filter-captures-errors-test
(let [xs (->> [0 1 2 3 4 5]
(fn [i]
(case i
0 false
1 (sut/->StreamError (ex-info (str ::odd-number) {:n i}))
3 (throw (ex-info (str ::odd-number) {:n i}))
5 (> i nil) ;; generate a NullPointerException
#_else i)))
;; NOTE: using vanilla stream reduce to confirm presence of errors
(is (= [2 4] (remove sut/stream-error? xs)))
(is (= 3 (count (filter sut/stream-error? xs))))
(is (= 1 (->> xs
(filter sut/stream-error?)
(fn [se]
(instance? NullPointerException (.error se))))
(is (= [1 3]
(->> xs
(filter sut/stream-error?)
(map (comp ex-data #(.error %)))
(remove nil?)
(map :n))))))
(deftest transform-captures-errors-test
(let [xs (->> [0 1 2 3 4 5]
(fn [i]
(case i
1 (sut/->StreamError (ex-info (str ::odd-number) {:n i}))
3 (throw (ex-info (str ::odd-number) {:n i}))
5 (> i nil) ;; generate a NullPointerException
#_else i))))
;; NOTE: using vanilla stream reduce to confirm presence of errors
(is (= [0 2 4] (remove sut/stream-error? xs)))
(is (= 3 (count (filter sut/stream-error? xs))))
(is (= 1 (->> xs
(filter sut/stream-error?)
(fn [se]
(instance? NullPointerException (.error se))))
(is (= [1 3]
(->> xs
(filter sut/stream-error?)
(map (comp ex-data #(.error %)))
(remove nil?)
(map :n))))))
(deftest transform-skips-existing-streamerrors-test
(let [processed (atom [])
xs (->> [0
(sut/->StreamError (ex-info (str ::odd-number) {:n 1}))
(fn [i]
(swap! processed conj i)
(* i 2))))
;; NOTE: using vanilla stream reduce to confirm presence of error
(is (= [0 2] @processed))
(is (= [0 4] (remove sut/stream-error? xs)))
(is (= 1 (count (filter sut/stream-error? xs))))
(is (= [1]
(->> xs
(filter sut/stream-error?)
(map (comp ex-data #(.error %)))
(remove nil?)
(map :n))))))
(deftest realize-stream-reduces-all-and-throws-first-error-test
(with-log-level :error
(let [processed (atom [])
s (s/stream 8)
_ @(s/put-all! s [0 1 2 3 4 5 6 7])
_ (s/close! s)
r (->> s
(fn [i]
(swap! processed conj i)
(case i
3 (throw (ex-info (str ::odd-number) {:n i}))
5 (throw (ex-info (str ::odd-number) {:n i}))
#_else i)))
rs (d/success-value r ::not-successful)
ed (ex-data (d/error-value r ::no-error))]
(is (= (range 0 8) @processed))
(is (= ::not-successful rs))
(is (= {:n 3} ed)))))
(deftest test-realize-stream-reduces-deferred-stream-and-throws-first-error-test
(with-log-level :error
(let [processed (atom [])
s (s/stream 8)
_ @(s/put-all! s [0 1 2 3 4 5 6 7])
_ (s/close! s)
r (->> s
(fn [i]
(swap! processed conj i)
(case i
3 (throw (ex-info (str ::odd-number) {:n i}))
5 (throw (ex-info (str ::odd-number) {:n i}))
#_else i)))
rs (d/success-value r ::not-successful)
ed (ex-data (d/error-value r ::no-error))]
(is (= (range 0 8) @processed))
(is (= ::not-successful rs))
(is (= {:n 3} ed)))))
(deftest map-serially*-test
(testing "behaves like map"
(let [r (->> [0 1 2 3 4 5]
(fn [v]
(Thread/sleep 20)
(inc v))))
(sut/reduce conj [])
(is (= r [1 2 3 4 5 6]))))
(testing "behaves like map with plain valued map fns"
(let [r (->> [0 1 2 3 4 5]
(sut/map-serially* ::map inc)
(sut/reduce conj [])
(is (= r [1 2 3 4 5 6]))))
(testing "catches errors"
(let [r (->> [0 1 2 3 4 5]
(fn [v]
(Thread/sleep 20)
(if (#{2 5} v)
(throw (ex-info "boo" {:v v}))
(inc v)))))
;; reduce without error propagation
(s/reduce conj [])
(is (= (filter number? r)
[1 2 4 5]))
(is (= (->> r
(filter sut/stream-error?)
(map (comp :v ex-data :error)))
[2 5]))))
(testing "catches errors with plain-valued map fns"
(let [r (->> [0 1 2 3 4 5]
(fn [v]
(if (#{2 5} v)
(throw (ex-info "boo" {:v v}))
(inc v))))
;; reduce without error propagation
(s/reduce conj [])
(is (= (filter number? r)
[1 2 4 5]))
(is (= (->> r
(filter sut/stream-error?)
(map (comp :v ex-data :error)))
[2 5]))))
(testing "applies strictly serially"
(let [active-a (atom #{})
active-history-a (atom [])
r (->> [0 1 2 3 4 5]
(fn [v]
(swap! active-a conj v)
(swap! active-history-a conj @active-a)
(Thread/sleep 20)
(swap! active-a disj v)
(swap! active-history-a conj @active-a)
(inc v))))
(s/reduce conj [])
(is (= r [1 2 3 4 5 6]))
(is (= #{} @active-a))
(is (= 12 (count @active-history-a)))
(is (= [#{0} #{} #{1} #{} #{2} #{} #{3} #{} #{4} #{} #{5} #{}]
(deftest map-concurrency-two-test
(testing "limits concurrency to 2"
(let [active-a (atom #{})
active-history-a (atom [])
r (->> [0 1 2 3 4 5 6 7 8 9]
(fn [v]
(swap! active-a conj v)
(swap! active-history-a conj @active-a)
(Thread/sleep 20)
(swap! active-a disj v)
(swap! active-history-a conj @active-a)
(inc v))))
(s/reduce conj [])
(is (= r [1 2 3 4 5 6 7 8 9 10]))
(is (= #{} @active-a))
(is (= 20 (count @active-history-a)))
(is (= 2 (->> @active-history-a
(map count)
(reduce max)))))))
(deftest buffer-concurrency-test
(testing "limits concurrency to 3"
(let [active-a (atom #{})
active-history-a (atom [])
r (->> [0 1 2 3 4 5 6 7 8 9]
(fn [v]
(swap! active-a conj v)
(swap! active-history-a conj @active-a)
(Thread/sleep 20)
(swap! active-a disj v)
(swap! active-history-a conj @active-a)
(inc v))))
(sut/buffer-concurrency 3)
(s/reduce conj [])
(is (= r [1 2 3 4 5 6 7 8 9 10]))
(is (= #{} @active-a))
(is (= 20 (count @active-history-a)))
(is (= 3 (->> @active-history-a
(map count)
(reduce max))))))
(testing "limits concurrency to 5"
(let [active-a (atom #{})
active-history-a (atom [])
r (->> [0 1 2 3 4 5 6 7 8 9 10 11 12 13 14]
(fn [v]
(swap! active-a conj v)
(swap! active-history-a conj @active-a)
(Thread/sleep 20)
(swap! active-a disj v)
(swap! active-history-a conj @active-a)
(inc v))))
(sut/buffer-concurrency 5)
(s/reduce conj [])
(is (= r [1 2 3 4 5 6 7 8 9 10 11 12 13 14 15]))
(is (= #{} @active-a))
(is (= 30 (count @active-history-a)))
(is (= 5 (->> @active-history-a
(map count)
(reduce max)))))))
(deftest map-concurrently-test
(testing "limits concurrency to 1"
(let [active-a (atom #{})
active-history-a (atom [])
r (->> [0 1 2 3 4 5]
(fn [v]
(swap! active-a conj v)
(swap! active-history-a conj @active-a)
(Thread/sleep 20)
(swap! active-a disj v)
(swap! active-history-a conj @active-a)
(inc v))))
(s/reduce conj [])
(is (= r [1 2 3 4 5 6]))
(is (= #{} @active-a))
(is (= 12 (count @active-history-a)))
(is (= [#{0} #{} #{1} #{} #{2} #{} #{3} #{} #{4} #{} #{5} #{}]
(testing "limits concurrency to 2"
(let [active-a (atom #{})
active-history-a (atom [])
r (->> [0 1 2 3 4 5 6 7 8 9]
(fn [v]
(swap! active-a conj v)
(swap! active-history-a conj @active-a)
(Thread/sleep 20)
(swap! active-a disj v)
(swap! active-history-a conj @active-a)
(inc v))))
(s/reduce conj [])
(is (= r [1 2 3 4 5 6 7 8 9 10]))
(is (= #{} @active-a))
(is (= 20 (count @active-history-a)))
(is (= 2 (->> @active-history-a
(map count)
(reduce max))))))
(testing "limits concurrency to 3"
(let [active-a (atom #{})
active-history-a (atom [])
r (->> [0 1 2 3 4 5 6 7 8 9]
(fn [v]
(swap! active-a conj v)
(swap! active-history-a conj @active-a)
(Thread/sleep 20)
(swap! active-a disj v)
(swap! active-history-a conj @active-a)
(inc v))))
(s/reduce conj [])
(is (= r [1 2 3 4 5 6 7 8 9 10]))
(is (= #{} @active-a))
(is (= 20 (count @active-history-a)))
(is (= 3 (->> @active-history-a
(map count)
(reduce max))))))
(testing "limits concurrency to 5"
(let [active-a (atom #{})
active-history-a (atom [])
r (->> [0 1 2 3 4 5 6 7 8 9 10 11 12 13 14]
(fn [v]
(swap! active-a conj v)
(swap! active-history-a conj @active-a)
(Thread/sleep 20)
(swap! active-a disj v)
(swap! active-history-a conj @active-a)
(inc v))))
(s/reduce conj [])
(is (= r [1 2 3 4 5 6 7 8 9 10 11 12 13 14 15]))
(is (= #{} @active-a))
(is (= 30 (count @active-history-a)))
(is (= 5 (->> @active-history-a
(map count)
(reduce max))))))
(testing "timeout option"
(let [result-stream (->> (sut/->source (range 4))
(sut/map-concurrently {:timeout-ms 50}
(fn [_] (d/future (Thread/sleep 100))))
(is (= 4 (count result-stream)))
(is (= #{StreamError}
(into #{} (map type result-stream)))))))
