Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
scoped streams in clojure
(ns streams.core
(:require [clojure.java.io :as io]))
(def END (Object.))
(defprotocol Stream
(with-generator [_ callback]
"Should call callback with a generator function, finally closing any
resources associated with the stream after the callback returns.
A 'generator' here is a function with no arguments which returns
a new value each time it's called, or the special END object if
it's exhausted.
The idea is that the generators (which are stateful) are an
implementation detail which the end-user needn't directly concern
themselves with, instead can use combinators here to compose and
consume streams in a functional style."))
(extend-protocol Stream
Iterable
(with-generator [iterable callback]
(let [iterator (.iterator iterable)]
(callback (fn [] (if (.hasNext iterator)
(.next iterator)
END))))))
(defn lines-of-file-stream
[file]
(reify Stream
(with-generator [_ callback]
(with-open [reader (io/reader file)]
(callback #(or (.readLine reader) END))))))
(defn map
[f stream]
(reify Stream
(with-generator [_ callback]
(with-generator stream
(fn [generator]
(callback #(let [next (generator)]
(if (identical? next END)
END
(f next)))))))))
(defn filter
[pred stream]
(reify Stream
(with-generator [_ callback]
(with-generator stream
(fn [generator]
(callback #(let [next (generator)]
(cond
(identical? next END) END
(pred next) next
true (recur)))))))))
;; Maybe could supply a CollReduce implementation here?
;; Also note: parallel versions of reduce or fold could be implemented in various ways
;; ontop of the generator interface.
(defn reduce
"Applies a reduction over the stream. Unlike with a lazy sequence
there is no possibility here of holding on to the head."
[identity reduce-fn stream]
(with-generator stream
(fn [generator]
(loop [running-total identity]
(let [next (generator)]
(if (identical? next END)
running-total
(recur (reduce-fn running-total next))))))))
(defn vec
"Realises the whole stream in memory as a vector"
[stream]
(persistent! (reduce (transient []) conj! stream)))
(defn- generator-seq
[generator]
(lazy-seq (let [next (generator)]
(when-not (identical? next END)
(cons next (generator-seq generator))))))
(defn with-lazy-seq
[stream callback]
(with-generator stream
(fn [generator]
(callback (generator-seq generator)))))
(comment
;; e.g.:
(->> (lines-of-file-stream "/tmp/foob")
(map #(.toLowerCase %))
(filter #(< (.length %) 5))
(reduce {} #(merge-with + %1 {%2 1}))))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment