Skip to content

Instantly share code, notes, and snippets.

@astrangeguy
Created June 10, 2010 21:57
Show Gist options
  • Save astrangeguy/433704 to your computer and use it in GitHub Desktop.
Save astrangeguy/433704 to your computer and use it in GitHub Desktop.
(import java.util.concurrent.LinkedBlockingQueue
java.util.concurrent.atomic.AtomicReference)
(defn pipe []
(let [q (LinkedBlockingQueue.)
qr (AtomicReference. q)
NULL (Object.)
END (Object.)
produce (fn produce
([item] (if-let [^LinkedBlockingQueue q (.get qr)]
(.add q (if (nil? item) NULL item))
(throw (IllegalStateException. "Pipe closed"))))
([] (when-let [^LinkedBlockingQueue q (.getAndSet qr nil)]
(.add q END))))
consume (fn consume []
(lazy-seq
(let [v (.take q)]
(condp = v
NULL (cons nil (consume))
END nil
(cons v (consume))))))]
[produce (consume)]))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment