Skip to content

Instantly share code, notes, and snippets.

@brianru
Created January 6, 2018 16:51
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save brianru/d30f0319e7a14d875a80762937cccb9c to your computer and use it in GitHub Desktop.
Save brianru/d30f0319e7a14d875a80762937cccb9c to your computer and use it in GitHub Desktop.
puts should only be processed when the buffer is not full
From 4e21ed98c9084b60661ba206263e9855669b6eba Mon Sep 17 00:00:00 2001
From: Brian James Rubinton <brianrubinton@gmail.com>
Date: Sat, 6 Jan 2018 11:07:09 -0500
Subject: [PATCH] only process puts on take! when buffer is not full
---
src/main/clojure/cljs/core/async/impl/channels.cljs | 2 +-
src/main/clojure/clojure/core/async/impl/channels.clj | 9 ++++++---
2 files changed, 7 insertions(+), 4 deletions(-)
diff --git a/src/main/clojure/cljs/core/async/impl/channels.cljs b/src/main/clojure/cljs/core/async/impl/channels.cljs
index 4435a22..be04e94 100644
--- a/src/main/clojure/cljs/core/async/impl/channels.cljs
+++ b/src/main/clojure/cljs/core/async/impl/channels.cljs
@@ -85,31 +85,31 @@
(when (impl/blockable? handler)
(assert (< (.-length puts) impl/MAX-QUEUE-SIZE)
(str "No more than " impl/MAX-QUEUE-SIZE
" pending puts are allowed on a single channel."
" Consider using a windowed buffer."))
(.unbounded-unshift puts (PutBox. handler val)))
nil)))))))
impl/ReadPort
(take! [this ^not-native handler]
(if (not ^boolean (impl/active? handler))
nil
(if (and (not (nil? buf)) (pos? (count buf)))
(do
(if-let [take-cb (impl/commit handler)]
(let [val (impl/remove! buf)
- [done? cbs] (when (pos? (.-length puts))
+ [done? cbs] (when (and (not (impl/full? buf)) (pos? (.-length puts)))
(loop [cbs []]
(let [putter (.pop puts)
^not-native put-handler (.-handler putter)
val (.-val putter)
cb (and ^boolean (impl/active? put-handler) (impl/commit put-handler))
cbs (if cb (conj cbs cb) cbs)
done? (when cb (reduced? (add! buf val)))]
(if (and (not done?) (not (impl/full? buf)) (pos? (.-length puts)))
(recur cbs)
[done? cbs]))))]
(when done?
(abort this))
(doseq [cb cbs]
(dispatch/run #(cb true)))
(box val))))
diff --git a/src/main/clojure/clojure/core/async/impl/channels.clj b/src/main/clojure/clojure/core/async/impl/channels.clj
index 063ffef..f3f2ca3 100644
--- a/src/main/clojure/clojure/core/async/impl/channels.clj
+++ b/src/main/clojure/clojure/core/async/impl/channels.clj
@@ -161,33 +161,37 @@
impl/ReadPort
(take!
[this handler]
(.lock mutex)
(cleanup this)
(let [^Lock handler handler
commit-handler (fn []
(.lock handler)
(let [take-cb (and (impl/active? handler) (impl/commit handler))]
(.unlock handler)
take-cb))]
(if (and buf (pos? (count buf)))
(do
(if-let [take-cb (commit-handler)]
(let [val (impl/remove! buf)
- iter (.iterator puts)
+ ;; a buffer will remain full after removing an item when the
+ ;; channel's xform is expanding and an element's expansion has
+ ;; not completed
+ full? (impl/full? buf)
+ iter (when full? (.iterator puts))
[done? cbs]
- (when (.hasNext iter)
+ (when (and (not full?) (.hasNext iter))
(loop [cbs []
[^Lock putter val] (.next iter)]
(.lock putter)
(let [cb (and (impl/active? putter) (impl/commit putter))]
(.unlock putter)
(.remove iter)
(let [cbs (if cb (conj cbs cb) cbs)
done? (when cb (reduced? (add! buf val)))]
(if (and (not done?) (not (impl/full? buf)) (.hasNext iter))
(recur cbs (.next iter))
[done? cbs])))))]
(when done?
(abort this))
(.unlock mutex)
(doseq [cb cbs]
@@ -288,16 +292,15 @@
([buf xform exh]
(ManyToManyChannel.
(LinkedList.) (LinkedList.) buf (atom false) (mutex/mutex)
(let [add! (if xform (xform impl/add!) impl/add!)]
(fn
([buf]
(try
(add! buf)
(catch Throwable t
(handle buf exh t))))
([buf val]
(try
(add! buf val)
(catch Throwable t
(handle buf exh t)))))))))
-
--
2.14.3 (Apple Git-98)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment