/gist:b7ecd4395a0d3d473de6 Secret
Created
July 25, 2012 21:29
Star
You must be signed in to star a gist
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
commit 2c43819d3de71d0e86cea59101afb311eee42da5 | |
Author: Alan Malloy <alan@malloys.org> | |
Date: Tue Jul 24 19:06:05 2012 -0700 | |
Fix race condition in seque | |
diff --git a/src/clj/clojure/core.clj b/src/clj/clojure/core.clj | |
index c4710c6..1d85a29 100644 | |
--- a/src/clj/clojure/core.clj | |
+++ b/src/clj/clojure/core.clj | |
@@ -4756,18 +4756,25 @@ | |
n-or-q | |
(LinkedBlockingQueue. (int n-or-q))) | |
NIL (Object.) ;nil sentinel since LBQ doesn't support nils | |
- agt (agent (seq s)) | |
+ agt (agent (lazy-seq s)) ; never start with nil; that signifies we've already put eos | |
+ log-error (fn [q e] | |
+ (if (.offer q q) | |
+ (throw e) | |
+ e)) | |
fill (fn [s] | |
+ (when s | |
+ (if (instance? Exception s) ; we failed to .offer an error earlier | |
+ (log-error q s) | |
(try | |
- (loop [[x & xs :as s] s] | |
+ (loop [[x & xs :as s] (seq s)] | |
(if s | |
(if (.offer q (if (nil? x) NIL x)) | |
(recur xs) | |
s) | |
- (.put q q))) ; q itself is eos sentinel | |
+ (when-not (.offer q q) ; q itself is eos sentinel | |
+ ()))) ; empty seq, not nil, so we know to put eos next time | |
(catch Exception e | |
- (.put q q) | |
- (throw e)))) | |
+ (log-error q e)))))) | |
drain (fn drain [] | |
(lazy-seq | |
(let [x (.take q)] | |
diff --git a/test/clojure/test_clojure/agents.clj b/test/clojure/test_clojure/agents.clj | |
index 49c209e..45a176f 100644 | |
--- a/test/clojure/test_clojure/agents.clj | |
+++ b/test/clojure/test_clojure/agents.clj | |
@@ -151,6 +151,28 @@ | |
(.join)) | |
(is (= @a :thread-binding)))) | |
+;; check for a race condition that was causing seque to leak threads from the | |
+;; send-off pool. Specifically, if we consume all items from the seque, and | |
+;; the LBQ continues to grow, it means there was an agent action blocking on | |
+;; the .put, which would block indefinitely outside of this test. | |
+(deftest seque-threads | |
+ (let [queue-size 5 | |
+ slow-seq (for [x (take (* 2 queue-size) (iterate inc 0))] | |
+ (do (Thread/sleep 25) | |
+ x)) | |
+ small-lbq (java.util.concurrent.LinkedBlockingQueue. queue-size) | |
+ worker (seque small-lbq slow-seq)] | |
+ (doall worker) | |
+ (is (= worker slow-seq)) | |
+ (Thread/sleep 250) ;; make sure agents have time to run or get blocked | |
+ (let [queue-backlog (.size small-lbq)] | |
+ (is (<= 0 queue-backlog queue-size)) | |
+ (when-not (zero? queue-backlog) | |
+ (.take small-lbq) | |
+ (Thread/sleep 250) ;; see if agent was blocking, indicating a thread leak | |
+ (is (= (.size small-lbq) | |
+ (dec queue-backlog))))))) | |
+ | |
; http://clojure.org/agents | |
; agent |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment