public
Last active

Alternative implementation of clojure.core/seque

  • Download Gist
test.clj
Clojure
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94
(ns test
(:import [java.util.concurrent
BlockingQueue
LinkedBlockingQueue
SynchronousQueue
PriorityBlockingQueue
CyclicBarrier])
(:use clojure.test)
(:refer-clojure :exclude [seque]))
 
;; some BlockingQueues are...
;; not ordered
;; not finite
;; unable to contain nil
;; without content
;; mutable
;; dropping or recieving extra items
 
;; seque...
;; fills the que on another thread
;; propagates errors
;; replaces nil with a sentinel
;; detects the end of the input seq
;; can be consumed on any thread
 
(defn seque
"Creates a queued seq on another (presumably lazy) seq s. The queued
seq will produce a concrete seq in the background, and can get up to
n items ahead of the consumer. n-or-q can be an integer n buffer
size, or an instance of java.util.concurrent BlockingQueue. Note
that reading from a seque can block if the reader gets ahead of the
producer."
{:added "1.0"}
([s] (seque 100 s))
([n-or-q s]
(let [^BlockingQueue q (if (instance? BlockingQueue n-or-q)
n-or-q
(LinkedBlockingQueue. (int n-or-q)))
NIL (Object.) ;nil sentinel since BQ doesn't support nils
s (map (fnil identity NIL) s)
channel (LinkedBlockingQueue.)
fill (fn fill [s]
(try
(if (seq s)
(do
(.put channel #(.take q))
(.put q (first s))
(recur (next s)))
(.put channel #(throw (InterruptedException.))))
(catch Exception e
(.put channel #(throw e)))))
fut (future (fill s))
drain (fn drain []
(lazy-seq
(try
(cons ((.take channel)) (drain))
(catch InterruptedException e nil))))]
(map #(if (identical? % NIL) nil %) (drain)))))
 
(defn trickle
[slow]
(map #(do (Thread/sleep 100) %) slow))
 
(deftest identity?
(is (= (seque (range 10)) (range 10))))
 
(deftest priority
(is (= 4 (count (seque (PriorityBlockingQueue.)
[:a :b :c :a])))))
 
(deftest synchronous
(is (= (seque (SynchronousQueue.)
(range 10))
(range 10))))
 
(deftest nils
(is (= (seque [nil true false []])
[nil true false []])))
 
(deftest errors
(is (thrown? Throwable
(dorun (seque (map #(throw (Exception. (str %))) (range 10)))))))
 
(deftest threads
(let [s (seque (SynchronousQueue.) (range 10))
f1 (future (doall s))
f2 (future (doall s))]
(is (= (range 10) @f1 @f2))))
 
(defn moronic [] ; untestable
(let [q (LinkedBlockingQueue. 100)]
(future (dotimes [_ 1000] (Thread/yield) (.poll q)))
(future (dotimes [_ 1000] (Thread/yield) (.offer q :foo)))
(seque q (range 1000))))

FWIW, I was running into an apparent deadlock using 1.3's seque, so I tried the above. It wouldn't compile without a
(loop [s s]
inserted after the try in fill, but with that change it fixed my deadlock. The code I was testing (from the repl) is:

(let [xs (seque (range 0 150000))
      ys (seque (filter odd? xs))
      s (future (apply + ys))]
  @s)

I suggest you add your problem and solution to he Jira issue: http://dev.clojure.org/jira/browse/CLJ-776

Please sign in to comment on this gist.

Something went wrong with that request. Please try again.