Skip to content

Instantly share code, notes, and snippets.

@raek
Created July 1, 2010 20:31
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save raek/460527 to your computer and use it in GitHub Desktop.
Save raek/460527 to your computer and use it in GitHub Desktop.
;; By Rasmus Svensson. No copyright.
(ns se.raek.pipes
(:require [clojure.java.io :as io])
(:use (clojure.contrib def json))
(:import (java.io EOFException BufferedReader BufferedWriter StringReader PrintWriter)
(javax.swing JOptionPane)
(java.util.concurrent BlockingQueue LinkedBlockingQueue SynchronousQueue)))
(defprotocol Source
(take! [source] [source end-value]
"Retrieves and removes the next object from the source.
If the source is closed the end-value sentinel object, or nil if it
is not given, is returned instead."))
(defprotocol Sink
(put! [sink item]
"Puts the item into the sink. Throws an IllegalStateException if the
sink is closed.")
(close! [sink]
"Closes the sink so that no more objects can be put into it."))
(defonce- end-of-stream (Object.))
(defonce- nil-substitute (Object.))
(defonce- plug (Object.))
(defn source-seq
"Returns a lazy sequence that takes items from source. The source-seq will
only contain all items emanating from the source if it is the only process
taking from that source."
[source]
(lazy-seq
(let [item (take! source end-of-stream)]
(if-not (= item end-of-stream)
(cons item (source-seq source))))))
(defnk make-feeder
"Starts a separate thread that puts every element of from-seq into sink.
from-seq is presumably a lazy sequence whose realization is expected to
block."
[sink from-seq :close-when-done false]
(future (doseq [element from-seq]
(put! sink element))
(if close-when-done
(close! sink))))
(defnk make-drainer
"Drains all items from source into sink. A separate thread is started and
takes items from source and puts them into sink until the source is closed."
[source sink :close-when-done false]
(future
(loop []
(let [item (take! source end-of-stream)]
(if (= item end-of-stream)
(if close-when-done
(close! sink))
(do (put! sink item)
(recur)))))))
(defn make-walker
"Starts a separate thread that walks through coll -- presumably a lazy
sequence whose realization is expected to block -- and records the last
part of its tail that has been realized. This is intended to provide a way
to \"jump in\" into the last known part of a sequence without having to
retain its head."
[coll]
(let [a (atom coll)
f (future
(loop [tail coll]
(swap! a (constantly tail))
(if (seq tail)
(recur (rest tail)))))]
{:tail a, :future f}))
(defrecord Pipe [queue source-closed source-lock sink-closed sink-lock]
Source
(take! [source]
(take! source nil))
(take! [source end-value]
(locking source-lock
(if @source-closed
end-value
(let [item (.take queue)]
(condp = item
nil-substitute nil
plug (do (compare-and-set! source-closed false true)
end-value)
item)))))
Sink
(put! [sink item]
(locking sink-lock
(if @sink-closed
(throw (IllegalStateException. "The pipe is closed."))
(if (nil? item)
(.put queue nil-substitute)
(.put queue item)))))
(close! [sink]
(locking sink-lock
(if (compare-and-set! sink-closed false true)
(.put queue plug)))))
(defn make-pipe
"Makes a pipe object, which can be used both as a Source and a Sink. Objects
put into the sink will be stored in a queue until they are taken from the
source."
[]
(Pipe. (new LinkedBlockingQueue)
(atom false) (Object.)
(atom false) (Object.)))
;; (defn make-rendezvous
;; "Makes a rendezvous object, which can be used both as a Source and a Sink.
;; The put! method will block until a take! is made and vice versa."
;; []
;; (Rendezvous. (new SynchronousQueue) ...))
(defrecord DialogSource [title message]
Source
(take! [source]
(JOptionPane/showInputDialog
nil message title JOptionPane/QUESTION_MESSAGE))
(take! [source end-value]
(if-let [answer (JOptionPane/showInputDialog
nil message title JOptionPane/QUESTION_MESSAGE)]
answer
end-value)))
(defn make-dialog-source [title message]
(DialogSource. title message))
(defrecord DialogSink [title]
Sink
(put! [sink item]
(JOptionPane/showMessageDialog
nil (str item) title JOptionPane/PLAIN_MESSAGE))
(close! [sink]
(JOptionPane/showMessageDialog
nil "The sink is now closed." title
JOptionPane/INFORMATION_MESSAGE)))
(defn make-dialog-sink [title]
(DialogSink. title))
(defrecord LineSource [reader]
Source
(take! [source]
(.readLine reader))
(take! [source end-value]
(if-let [line (.readLine reader)]
line
end-value)))
(defn make-line-source [in & options]
"Makes a source that reads lines from a BufferedReader. The arguments are
passed to clojure.java.io/reader to make a reader for x, if it isn't one."
(LineSource. (apply io/reader in options)))
(defrecord LineSink [writer]
Sink
(put! [sink item]
(try
(doto writer
(.write (str item))
(.newLine)
(.flush))
(catch EOFException _
(throw IllegalStateException "The writer is closed."))))
(close! [sink]
(.close writer)))
(defn make-line-sink [out & options]
"Makes a source that reads lines from a BufferedReader. The arguments are
passed to clojure.java.io/reader to make a reader for x, if it isn't one."
;; Making a OutputStream of a Socket is currently broken in clojure.java.io/writer
(let [out (if (instance? java.net.Socket out)
(.getOutputStream out)
out)]
(LineSink. (apply io/writer out options))))
;; (defrecord JSONSource [reader keywordize]
;; Source
;; (take! [source]
;; (read-json reader keywordize false nil))
;; (take! [source end-value]
;; (read-json reader keywordize false end-value)))
(defrecord JSONSource [reader keywordize]
Source
(take! [source]
(try
(read-json reader keywordize false nil)
(catch IllegalArgumentException _
nil)))
(take! [source end-value]
(try
(read-json reader keywordize false end-value)
(catch IllegalArgumentException _
end-value))))
(defn make-json-source [in & options]
(let [opts (apply hash-map options)
keywordize (get options :keywordize true)
reader (if (string? in)
(StringReader. in)
(apply io/reader in options))]
(JSONSource. reader keywordize)))
(defrecord JSONSink [writer]
Sink
(put! [sink item]
(try
(write-json item writer)
(.println writer)
(.flush writer)
(catch EOFException _
(throw (IllegalStateException. "The writer is closed.")))))
(close! [sink]
(.close writer)))
(defn make-json-sink [out & options]
(let [print-writer (cond (instance? PrintWriter out) out
(instance? java.net.Socket out) (PrintWriter. (apply io/writer (.getOutputStream out) options))
:esle (PrintWriter. (apply io/writer out options)))]
(JSONSink. print-writer)))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment