Created
January 6, 2018 16:51
-
-
Save brianru/d30f0319e7a14d875a80762937cccb9c to your computer and use it in GitHub Desktop.
puts should only be processed when the buffer is not full
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
From 4e21ed98c9084b60661ba206263e9855669b6eba Mon Sep 17 00:00:00 2001 | |
From: Brian James Rubinton <brianrubinton@gmail.com> | |
Date: Sat, 6 Jan 2018 11:07:09 -0500 | |
Subject: [PATCH] only process puts on take! when buffer is not full | |
--- | |
src/main/clojure/cljs/core/async/impl/channels.cljs | 2 +- | |
src/main/clojure/clojure/core/async/impl/channels.clj | 9 ++++++--- | |
2 files changed, 7 insertions(+), 4 deletions(-) | |
diff --git a/src/main/clojure/cljs/core/async/impl/channels.cljs b/src/main/clojure/cljs/core/async/impl/channels.cljs | |
index 4435a22..be04e94 100644 | |
--- a/src/main/clojure/cljs/core/async/impl/channels.cljs | |
+++ b/src/main/clojure/cljs/core/async/impl/channels.cljs | |
@@ -85,31 +85,31 @@ | |
(when (impl/blockable? handler) | |
(assert (< (.-length puts) impl/MAX-QUEUE-SIZE) | |
(str "No more than " impl/MAX-QUEUE-SIZE | |
" pending puts are allowed on a single channel." | |
" Consider using a windowed buffer.")) | |
(.unbounded-unshift puts (PutBox. handler val))) | |
nil))))))) | |
impl/ReadPort | |
(take! [this ^not-native handler] | |
(if (not ^boolean (impl/active? handler)) | |
nil | |
(if (and (not (nil? buf)) (pos? (count buf))) | |
(do | |
(if-let [take-cb (impl/commit handler)] | |
(let [val (impl/remove! buf) | |
- [done? cbs] (when (pos? (.-length puts)) | |
+ [done? cbs] (when (and (not (impl/full? buf)) (pos? (.-length puts))) | |
(loop [cbs []] | |
(let [putter (.pop puts) | |
^not-native put-handler (.-handler putter) | |
val (.-val putter) | |
cb (and ^boolean (impl/active? put-handler) (impl/commit put-handler)) | |
cbs (if cb (conj cbs cb) cbs) | |
done? (when cb (reduced? (add! buf val)))] | |
(if (and (not done?) (not (impl/full? buf)) (pos? (.-length puts))) | |
(recur cbs) | |
[done? cbs]))))] | |
(when done? | |
(abort this)) | |
(doseq [cb cbs] | |
(dispatch/run #(cb true))) | |
(box val)))) | |
diff --git a/src/main/clojure/clojure/core/async/impl/channels.clj b/src/main/clojure/clojure/core/async/impl/channels.clj | |
index 063ffef..f3f2ca3 100644 | |
--- a/src/main/clojure/clojure/core/async/impl/channels.clj | |
+++ b/src/main/clojure/clojure/core/async/impl/channels.clj | |
@@ -161,33 +161,37 @@ | |
impl/ReadPort | |
(take! | |
[this handler] | |
(.lock mutex) | |
(cleanup this) | |
(let [^Lock handler handler | |
commit-handler (fn [] | |
(.lock handler) | |
(let [take-cb (and (impl/active? handler) (impl/commit handler))] | |
(.unlock handler) | |
take-cb))] | |
(if (and buf (pos? (count buf))) | |
(do | |
(if-let [take-cb (commit-handler)] | |
(let [val (impl/remove! buf) | |
- iter (.iterator puts) | |
+ ;; a buffer will remain full after removing an item when the | |
+ ;; channel's xform is expanding and an element's expansion has | |
+ ;; not completed | |
+ full? (impl/full? buf) | |
+ iter (when full? (.iterator puts)) | |
[done? cbs] | |
- (when (.hasNext iter) | |
+ (when (and (not full?) (.hasNext iter)) | |
(loop [cbs [] | |
[^Lock putter val] (.next iter)] | |
(.lock putter) | |
(let [cb (and (impl/active? putter) (impl/commit putter))] | |
(.unlock putter) | |
(.remove iter) | |
(let [cbs (if cb (conj cbs cb) cbs) | |
done? (when cb (reduced? (add! buf val)))] | |
(if (and (not done?) (not (impl/full? buf)) (.hasNext iter)) | |
(recur cbs (.next iter)) | |
[done? cbs])))))] | |
(when done? | |
(abort this)) | |
(.unlock mutex) | |
(doseq [cb cbs] | |
@@ -288,16 +292,15 @@ | |
([buf xform exh] | |
(ManyToManyChannel. | |
(LinkedList.) (LinkedList.) buf (atom false) (mutex/mutex) | |
(let [add! (if xform (xform impl/add!) impl/add!)] | |
(fn | |
([buf] | |
(try | |
(add! buf) | |
(catch Throwable t | |
(handle buf exh t)))) | |
([buf val] | |
(try | |
(add! buf val) | |
(catch Throwable t | |
(handle buf exh t))))))))) | |
- | |
-- | |
2.14.3 (Apple Git-98) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment