Created
July 1, 2010 20:31
-
-
Save raek/460527 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
;; By Rasmus Svensson. No copyright. | |
(ns se.raek.pipes | |
(:require [clojure.java.io :as io]) | |
(:use (clojure.contrib def json)) | |
(:import (java.io EOFException BufferedReader BufferedWriter StringReader PrintWriter) | |
(javax.swing JOptionPane) | |
(java.util.concurrent BlockingQueue LinkedBlockingQueue SynchronousQueue))) | |
(defprotocol Source | |
(take! [source] [source end-value] | |
"Retrieves and removes the next object from the source. | |
If the source is closed the end-value sentinel object, or nil if it | |
is not given, is returned instead.")) | |
(defprotocol Sink | |
(put! [sink item] | |
"Puts the item into the sink. Throws an IllegalStateException if the | |
sink is closed.") | |
(close! [sink] | |
"Closes the sink so that no more objects can be put into it.")) | |
(defonce- end-of-stream (Object.)) | |
(defonce- nil-substitute (Object.)) | |
(defonce- plug (Object.)) | |
(defn source-seq | |
"Returns a lazy sequence that takes items from source. The source-seq will | |
only contain all items emanating from the source if it is the only process | |
taking from that source." | |
[source] | |
(lazy-seq | |
(let [item (take! source end-of-stream)] | |
(if-not (= item end-of-stream) | |
(cons item (source-seq source)))))) | |
(defnk make-feeder | |
"Starts a separate thread that puts every element of from-seq into sink. | |
from-seq is presumably a lazy sequence whose realization is expected to | |
block." | |
[sink from-seq :close-when-done false] | |
(future (doseq [element from-seq] | |
(put! sink element)) | |
(if close-when-done | |
(close! sink)))) | |
(defnk make-drainer | |
"Drains all items from source into sink. A separate thread is started and | |
takes items from source and puts them into sink until the source is closed." | |
[source sink :close-when-done false] | |
(future | |
(loop [] | |
(let [item (take! source end-of-stream)] | |
(if (= item end-of-stream) | |
(if close-when-done | |
(close! sink)) | |
(do (put! sink item) | |
(recur))))))) | |
(defn make-walker | |
"Starts a separate thread that walks through coll -- presumably a lazy | |
sequence whose realization is expected to block -- and records the last | |
part of its tail that has been realized. This is intended to provide a way | |
to \"jump in\" into the last known part of a sequence without having to | |
retain its head." | |
[coll] | |
(let [a (atom coll) | |
f (future | |
(loop [tail coll] | |
(swap! a (constantly tail)) | |
(if (seq tail) | |
(recur (rest tail)))))] | |
{:tail a, :future f})) | |
(defrecord Pipe [queue source-closed source-lock sink-closed sink-lock] | |
Source | |
(take! [source] | |
(take! source nil)) | |
(take! [source end-value] | |
(locking source-lock | |
(if @source-closed | |
end-value | |
(let [item (.take queue)] | |
(condp = item | |
nil-substitute nil | |
plug (do (compare-and-set! source-closed false true) | |
end-value) | |
item))))) | |
Sink | |
(put! [sink item] | |
(locking sink-lock | |
(if @sink-closed | |
(throw (IllegalStateException. "The pipe is closed.")) | |
(if (nil? item) | |
(.put queue nil-substitute) | |
(.put queue item))))) | |
(close! [sink] | |
(locking sink-lock | |
(if (compare-and-set! sink-closed false true) | |
(.put queue plug))))) | |
(defn make-pipe | |
"Makes a pipe object, which can be used both as a Source and a Sink. Objects | |
put into the sink will be stored in a queue until they are taken from the | |
source." | |
[] | |
(Pipe. (new LinkedBlockingQueue) | |
(atom false) (Object.) | |
(atom false) (Object.))) | |
;; (defn make-rendezvous | |
;; "Makes a rendezvous object, which can be used both as a Source and a Sink. | |
;; The put! method will block until a take! is made and vice versa." | |
;; [] | |
;; (Rendezvous. (new SynchronousQueue) ...)) | |
(defrecord DialogSource [title message] | |
Source | |
(take! [source] | |
(JOptionPane/showInputDialog | |
nil message title JOptionPane/QUESTION_MESSAGE)) | |
(take! [source end-value] | |
(if-let [answer (JOptionPane/showInputDialog | |
nil message title JOptionPane/QUESTION_MESSAGE)] | |
answer | |
end-value))) | |
(defn make-dialog-source [title message] | |
(DialogSource. title message)) | |
(defrecord DialogSink [title] | |
Sink | |
(put! [sink item] | |
(JOptionPane/showMessageDialog | |
nil (str item) title JOptionPane/PLAIN_MESSAGE)) | |
(close! [sink] | |
(JOptionPane/showMessageDialog | |
nil "The sink is now closed." title | |
JOptionPane/INFORMATION_MESSAGE))) | |
(defn make-dialog-sink [title] | |
(DialogSink. title)) | |
(defrecord LineSource [reader] | |
Source | |
(take! [source] | |
(.readLine reader)) | |
(take! [source end-value] | |
(if-let [line (.readLine reader)] | |
line | |
end-value))) | |
(defn make-line-source [in & options] | |
"Makes a source that reads lines from a BufferedReader. The arguments are | |
passed to clojure.java.io/reader to make a reader for x, if it isn't one." | |
(LineSource. (apply io/reader in options))) | |
(defrecord LineSink [writer] | |
Sink | |
(put! [sink item] | |
(try | |
(doto writer | |
(.write (str item)) | |
(.newLine) | |
(.flush)) | |
(catch EOFException _ | |
(throw IllegalStateException "The writer is closed.")))) | |
(close! [sink] | |
(.close writer))) | |
(defn make-line-sink [out & options] | |
"Makes a source that reads lines from a BufferedReader. The arguments are | |
passed to clojure.java.io/reader to make a reader for x, if it isn't one." | |
;; Making a OutputStream of a Socket is currently broken in clojure.java.io/writer | |
(let [out (if (instance? java.net.Socket out) | |
(.getOutputStream out) | |
out)] | |
(LineSink. (apply io/writer out options)))) | |
;; (defrecord JSONSource [reader keywordize] | |
;; Source | |
;; (take! [source] | |
;; (read-json reader keywordize false nil)) | |
;; (take! [source end-value] | |
;; (read-json reader keywordize false end-value))) | |
(defrecord JSONSource [reader keywordize] | |
Source | |
(take! [source] | |
(try | |
(read-json reader keywordize false nil) | |
(catch IllegalArgumentException _ | |
nil))) | |
(take! [source end-value] | |
(try | |
(read-json reader keywordize false end-value) | |
(catch IllegalArgumentException _ | |
end-value)))) | |
(defn make-json-source [in & options] | |
(let [opts (apply hash-map options) | |
keywordize (get options :keywordize true) | |
reader (if (string? in) | |
(StringReader. in) | |
(apply io/reader in options))] | |
(JSONSource. reader keywordize))) | |
(defrecord JSONSink [writer] | |
Sink | |
(put! [sink item] | |
(try | |
(write-json item writer) | |
(.println writer) | |
(.flush writer) | |
(catch EOFException _ | |
(throw (IllegalStateException. "The writer is closed."))))) | |
(close! [sink] | |
(.close writer))) | |
(defn make-json-sink [out & options] | |
(let [print-writer (cond (instance? PrintWriter out) out | |
(instance? java.net.Socket out) (PrintWriter. (apply io/writer (.getOutputStream out) options)) | |
:esle (PrintWriter. (apply io/writer out options)))] | |
(JSONSink. print-writer))) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment