Create a gist now

Instantly share code, notes, and snippets.

Alternative implementation of clojure.core/seque
(ns test
(:import [java.util.concurrent
(:use clojure.test)
(:refer-clojure :exclude [seque]))
;; some BlockingQueues are...
;; not ordered
;; not finite
;; unable to contain nil
;; without content
;; mutable
;; dropping or recieving extra items
;; seque...
;; fills the que on another thread
;; propagates errors
;; replaces nil with a sentinel
;; detects the end of the input seq
;; can be consumed on any thread
(defn seque
"Creates a queued seq on another (presumably lazy) seq s. The queued
seq will produce a concrete seq in the background, and can get up to
n items ahead of the consumer. n-or-q can be an integer n buffer
size, or an instance of java.util.concurrent BlockingQueue. Note
that reading from a seque can block if the reader gets ahead of the
{:added "1.0"}
([s] (seque 100 s))
([n-or-q s]
(let [^BlockingQueue q (if (instance? BlockingQueue n-or-q)
(LinkedBlockingQueue. (int n-or-q)))
NIL (Object.) ;nil sentinel since BQ doesn't support nils
s (map (fnil identity NIL) s)
channel (LinkedBlockingQueue.)
fill (fn fill [s]
(if (seq s)
(.put channel #(.take q))
(.put q (first s))
(recur (next s)))
(.put channel #(throw (InterruptedException.))))
(catch Exception e
(.put channel #(throw e)))))
fut (future (fill s))
drain (fn drain []
(cons ((.take channel)) (drain))
(catch InterruptedException e nil))))]
(map #(if (identical? % NIL) nil %) (drain)))))
(defn trickle
(map #(do (Thread/sleep 100) %) slow))
(deftest identity?
(is (= (seque (range 10)) (range 10))))
(deftest priority
(is (= 4 (count (seque (PriorityBlockingQueue.)
[:a :b :c :a])))))
(deftest synchronous
(is (= (seque (SynchronousQueue.)
(range 10))
(range 10))))
(deftest nils
(is (= (seque [nil true false []])
[nil true false []])))
(deftest errors
(is (thrown? Throwable
(dorun (seque (map #(throw (Exception. (str %))) (range 10)))))))
(deftest threads
(let [s (seque (SynchronousQueue.) (range 10))
f1 (future (doall s))
f2 (future (doall s))]
(is (= (range 10) @f1 @f2))))
(defn moronic [] ; untestable
(let [q (LinkedBlockingQueue. 100)]
(future (dotimes [_ 1000] (Thread/yield) (.poll q)))
(future (dotimes [_ 1000] (Thread/yield) (.offer q :foo)))
(seque q (range 1000))))

FWIW, I was running into an apparent deadlock using 1.3's seque, so I tried the above. It wouldn't compile without a
(loop [s s]
inserted after the try in fill, but with that change it fixed my deadlock. The code I was testing (from the repl) is:

(let [xs (seque (range 0 150000))
      ys (seque (filter odd? xs))
      s (future (apply + ys))]

I suggest you add your problem and solution to he Jira issue:

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment