Skip to content

Instantly share code, notes, and snippets.

@raek
Created February 22, 2010 15:09
Show Gist options
  • Save raek/311139 to your computer and use it in GitHub Desktop.
Save raek/311139 to your computer and use it in GitHub Desktop.
;; By Rasmus Svensson. No copyright.
(ns pipes
(:import [java.io BufferedReader BufferedWriter]
[java.util.concurrent BlockingQueue LinkedBlockingQueue SynchronousQueue]))
(defprotocol Source
(take! [source] "Retrieves and removes the next object from the source."))
(defprotocol Sink
(put! [sink x] "Adds x to the sink as the last object."))
(defn source-seq [source]
(repeatedly (take! source)))
(extend-class BlockingQueue
Source
(take! [source]
(.take source))
Sink
(put! [sink x]
(.put sink x)))
(extend-class BufferedReader
Source
(take! [source]
(locking source
(.readLine source))))
(extend-class BufferedWriter
Sink
(put! [sink x]
(locking sink
(.write sink (str x))
(.newLine sink)
(.flush sink))))
(defn make-pipe
"Makes a pipe object, which can be used both as a Source and a Sink. Objects
put into the sink will be stored in a queue until they are taken from the
source."
[]
(new LinkedBlockingQueue))
(defn make-rendezvous
"Makes a rendezvous object, which can be used both as a Source and a Sink.
The putting of an object into the sink will block until take! is made from
the source."
[]
(new SynchronousQueue))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment