Created
April 21, 2011 15:30
-
-
Save pepijndevos/934781 to your computer and use it in GitHub Desktop.
Alternative implementation of clojure.core/seque
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(ns test | |
(:import [java.util.concurrent | |
BlockingQueue | |
LinkedBlockingQueue | |
SynchronousQueue | |
PriorityBlockingQueue | |
CyclicBarrier]) | |
(:use clojure.test) | |
(:refer-clojure :exclude [seque])) | |
;; some BlockingQueues are... | |
;; not ordered | |
;; not finite | |
;; unable to contain nil | |
;; without content | |
;; mutable | |
;; dropping or recieving extra items | |
;; seque... | |
;; fills the que on another thread | |
;; propagates errors | |
;; replaces nil with a sentinel | |
;; detects the end of the input seq | |
;; can be consumed on any thread | |
(defn seque | |
"Creates a queued seq on another (presumably lazy) seq s. The queued | |
seq will produce a concrete seq in the background, and can get up to | |
n items ahead of the consumer. n-or-q can be an integer n buffer | |
size, or an instance of java.util.concurrent BlockingQueue. Note | |
that reading from a seque can block if the reader gets ahead of the | |
producer." | |
{:added "1.0"} | |
([s] (seque 100 s)) | |
([n-or-q s] | |
(let [^BlockingQueue q (if (instance? BlockingQueue n-or-q) | |
n-or-q | |
(LinkedBlockingQueue. (int n-or-q))) | |
NIL (Object.) ;nil sentinel since BQ doesn't support nils | |
s (map (fnil identity NIL) s) | |
channel (LinkedBlockingQueue.) | |
fill (fn fill [s] | |
(try | |
(if (seq s) | |
(do | |
(.put channel #(.take q)) | |
(.put q (first s)) | |
(recur (next s))) | |
(.put channel #(throw (InterruptedException.)))) | |
(catch Exception e | |
(.put channel #(throw e))))) | |
fut (future (fill s)) | |
drain (fn drain [] | |
(lazy-seq | |
(try | |
(cons ((.take channel)) (drain)) | |
(catch InterruptedException e nil))))] | |
(map #(if (identical? % NIL) nil %) (drain))))) | |
(defn trickle | |
[slow] | |
(map #(do (Thread/sleep 100) %) slow)) | |
(deftest identity? | |
(is (= (seque (range 10)) (range 10)))) | |
(deftest priority | |
(is (= 4 (count (seque (PriorityBlockingQueue.) | |
[:a :b :c :a]))))) | |
(deftest synchronous | |
(is (= (seque (SynchronousQueue.) | |
(range 10)) | |
(range 10)))) | |
(deftest nils | |
(is (= (seque [nil true false []]) | |
[nil true false []]))) | |
(deftest errors | |
(is (thrown? Throwable | |
(dorun (seque (map #(throw (Exception. (str %))) (range 10))))))) | |
(deftest threads | |
(let [s (seque (SynchronousQueue.) (range 10)) | |
f1 (future (doall s)) | |
f2 (future (doall s))] | |
(is (= (range 10) @f1 @f2)))) | |
(defn moronic [] ; untestable | |
(let [q (LinkedBlockingQueue. 100)] | |
(future (dotimes [_ 1000] (Thread/yield) (.poll q))) | |
(future (dotimes [_ 1000] (Thread/yield) (.offer q :foo))) | |
(seque q (range 1000)))) |
I suggest you add your problem and solution to he Jira issue: http://dev.clojure.org/jira/browse/CLJ-776
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
FWIW, I was running into an apparent deadlock using 1.3's seque, so I tried the above. It wouldn't compile without a
(loop [s s]
inserted after the try in fill, but with that change it fixed my deadlock. The code I was testing (from the repl) is: