secret
Created

  • Download Gist
gistfile1.diff
Diff
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74
commit 2c43819d3de71d0e86cea59101afb311eee42da5
Author: Alan Malloy <alan@malloys.org>
Date: Tue Jul 24 19:06:05 2012 -0700
 
Fix race condition in seque
 
diff --git a/src/clj/clojure/core.clj b/src/clj/clojure/core.clj
index c4710c6..1d85a29 100644
--- a/src/clj/clojure/core.clj
+++ b/src/clj/clojure/core.clj
@@ -4756,18 +4756,25 @@
n-or-q
(LinkedBlockingQueue. (int n-or-q)))
NIL (Object.) ;nil sentinel since LBQ doesn't support nils
- agt (agent (seq s))
+ agt (agent (lazy-seq s)) ; never start with nil; that signifies we've already put eos
+ log-error (fn [q e]
+ (if (.offer q q)
+ (throw e)
+ e))
fill (fn [s]
+ (when s
+ (if (instance? Exception s) ; we failed to .offer an error earlier
+ (log-error q s)
(try
- (loop [[x & xs :as s] s]
+ (loop [[x & xs :as s] (seq s)]
(if s
(if (.offer q (if (nil? x) NIL x))
(recur xs)
s)
- (.put q q))) ; q itself is eos sentinel
+ (when-not (.offer q q) ; q itself is eos sentinel
+ ()))) ; empty seq, not nil, so we know to put eos next time
(catch Exception e
- (.put q q)
- (throw e))))
+ (log-error q e))))))
drain (fn drain []
(lazy-seq
(let [x (.take q)]
diff --git a/test/clojure/test_clojure/agents.clj b/test/clojure/test_clojure/agents.clj
index 49c209e..45a176f 100644
--- a/test/clojure/test_clojure/agents.clj
+++ b/test/clojure/test_clojure/agents.clj
@@ -151,6 +151,28 @@
(.join))
(is (= @a :thread-binding))))
+;; check for a race condition that was causing seque to leak threads from the
+;; send-off pool. Specifically, if we consume all items from the seque, and
+;; the LBQ continues to grow, it means there was an agent action blocking on
+;; the .put, which would block indefinitely outside of this test.
+(deftest seque-threads
+ (let [queue-size 5
+ slow-seq (for [x (take (* 2 queue-size) (iterate inc 0))]
+ (do (Thread/sleep 25)
+ x))
+ small-lbq (java.util.concurrent.LinkedBlockingQueue. queue-size)
+ worker (seque small-lbq slow-seq)]
+ (doall worker)
+ (is (= worker slow-seq))
+ (Thread/sleep 250) ;; make sure agents have time to run or get blocked
+ (let [queue-backlog (.size small-lbq)]
+ (is (<= 0 queue-backlog queue-size))
+ (when-not (zero? queue-backlog)
+ (.take small-lbq)
+ (Thread/sleep 250) ;; see if agent was blocking, indicating a thread leak
+ (is (= (.size small-lbq)
+ (dec queue-backlog)))))))
+
; http://clojure.org/agents
; agent

Please sign in to comment on this gist.

Something went wrong with that request. Please try again.