Last active
October 1, 2018 13:58
-
-
Save KingCode/8bc23748acb0b0421a2b2f96a15d207a to your computer and use it in GitHub Desktop.
An implemementation of the transducer API from scratch, as a learning aid.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(ns scratch.transducer | |
(:refer-clojure :exclude [comp map filter mapcat take drop | |
take-while drop-while | |
dedupe | |
distinct | |
reductions | |
;; sequence | |
transduce])) | |
;; STEP FUNCTION: | |
;; The traditional 2-arity function passed to clojure.core/reduce, | |
;; which takes the accumulator and the element being processed, and yields | |
;; the accumulator for the next step. When all items have been processed, | |
;; the accumulator is the result. Any 2-arity fn, including | |
;; all arithmetic fns, conj, cons, concat etc., can be a step function in a reducing | |
;; context, e.g. (reduce #(/ % %2) [1000 5 2 4 5 5]) ;; => 1 | |
;; REDUCING FUNCTION: | |
;; A step function with 1- and 0-arity versions added. The 0-arity provides | |
;; an initial value, and the 1-arity yields the result (after all elements | |
;; have been processed). | |
;; The reducing function is the processor fn for the pipeline, and is usually | |
;; the result of nesting possibly many reducing functions, stitched together by | |
;; the transducer middleware. | |
;; Examples: +, *, conj (but not e.g. / or cons, unless adding custom arities) | |
;; TRANSDUCER: | |
;; A transducer is a function taking a reducing function (RF, or rf) F and returning | |
;; another reducing function G - in other words, transducers are middleware | |
;; for reducing functions. | |
;; | |
;; When invoked, G invokes F after doing its own work - usually invoking | |
;; the next rf in the pipeline, which could also be the last, e.g. conj | |
;; for a sequential output coll. | |
;; | |
;; Transducers are thus composable and the glue of the processing pipeline. | |
;; They enable decoupling of processing from both input data and output format. | |
;; Each element goes through the entire pipeline operation at once (an added | |
;; performance bonus). | |
;; | |
;; Transducers are created by transducer generating functions (factories) | |
;; such as #map, #filter et all. | |
;; | |
;; In order to be API compliant a transducer must return a full, 3-arities | |
;; reducing function, or exceptions will occur when the transducing context | |
;; tries to invoke the missing arities. | |
;; | |
;; The transducer factory fn usually takes a 1-arity function argument, | |
;; which is invoked on the current element within the step function. | |
;; | |
;; TRANSDUCING CONTEXT: | |
;; A transducing context puts together a transducer and input | |
;; and output methods (streams, collections, channels etc.) | |
;; Examples are transduce, sequence, 3-arity into, eduction. | |
(defn comp [f & fs] | |
(if (not fs) | |
(fn [& xs] | |
(apply f xs)) | |
(fn [ & xs] | |
(f (apply (apply comp fs) xs))))) | |
(defn reductions | |
([f init coll] | |
(seq | |
(reduce (fn [accs x] | |
(conj accs | |
(f (last accs) x))) | |
[init] | |
coll))) | |
([f coll] | |
(reductions f (first coll) (rest coll)))) | |
(defn rf-addenda [rf-sym] | |
`[([~'acc] (~rf-sym ~'acc)) | |
([] (~rf-sym))]) | |
;; (as-rf ([acc x] (rf acc (f x))) rf) | |
;; => (fn ([acc x] ...) ([acc] (my-rf acc)) ([] (my-rf)) | |
(defmacro as-rf [step-fn-args+body rf-sym] | |
`(fn | |
~step-fn-args+body | |
;; ~@(rf-addenda rf-sym) | |
([~'acc] (~rf-sym ~'acc)) | |
([] (~rf-sym)))) | |
;; | |
;; Functions yielding transducers | |
;; | |
(defn mapping [f] | |
(fn [rf] | |
(as-rf ([acc x] (rf acc (f x))) rf))) | |
(defn filtering [pred] | |
(fn [rf] | |
(as-rf | |
([acc x] | |
(if (pred x) | |
(rf acc x) | |
acc)) rf))) | |
(defn removing [pred] | |
(filtering (complement pred))) | |
(defn mapcatting [f] | |
(fn [rf] | |
(as-rf | |
([acc x] | |
(reduce rf acc (f x))) | |
rf))) | |
;; utility for count-down state | |
(defn done-pred [steps] | |
(let [ctr (atom (dec steps))] | |
(fn [] | |
(let [n @ctr] | |
(if (neg? n) | |
true | |
(do (swap! ctr dec) | |
false)))))) | |
;; utility for predicate based single on to off, state. | |
(defn on->off-pred [pred] | |
(let [on? (atom true)] | |
(fn [v] (and @on? | |
(let [onn? (pred v)] | |
(when-not onn? | |
(reset! on? false)) | |
onn?))))) | |
;; | |
;; Functions yielding stateful transducers | |
;; | |
(defn taking [n] | |
(fn [rf] | |
(let [done? (done-pred n)] | |
(as-rf | |
([acc x] | |
(if (done?) | |
(reduced acc) | |
(rf acc x))) | |
rf)))) | |
(defn taking-while [pred] | |
(fn [rf] | |
(let [on? (on->off-pred pred)] | |
(as-rf | |
([acc x] | |
(if (on? x) | |
(rf acc x) | |
(reduced acc))) | |
rf)))) | |
(defn dropping [n] | |
(fn [rf] | |
(let [started? (done-pred n)] | |
(as-rf | |
([acc x] | |
(if (started?) | |
(rf acc x) | |
acc)) | |
rf)))) | |
(defn dropping-while [pred] | |
(fn [rf] | |
(let [dropping? (on->off-pred pred)] | |
(as-rf | |
([acc x] | |
(if (dropping? x) | |
acc | |
(rf acc x))) | |
rf)))) | |
(defn distinct [] | |
(fn [rf] | |
(let [xs (atom #{})] | |
(as-rf | |
([acc x] | |
(if-not (@xs x) | |
(do (swap! xs conj x) | |
(rf acc x)) | |
acc)) | |
rf)))) | |
(defn dedupe [] | |
(fn [rf] | |
(let [last (atom nil)] | |
(as-rf | |
([acc x] | |
(if-not (= @last x) | |
(do (reset! last x) | |
(rf acc x)) | |
acc)) | |
rf)))) | |
;; Function which puts together transducers, input and output | |
;; xf is the transducer used (or composition of), | |
;; f is a reducing function aggregating the output | |
;; if provided (default is invoking 0-arity f), init is the initial output value | |
;; | |
(defn transduce | |
([xf f init coll] | |
(reduce (xf f) init coll)) | |
([xf f coll] | |
(transduce xf f (f) coll))) | |
(defn map-reduced [f coll] | |
(if (empty? coll) | |
() | |
(let [[x & xs] coll | |
v (f x)] | |
(cons v | |
(if (reduced? v) | |
() | |
(lazy-seq (map-reduced f xs))))))) | |
#_(defn append [f [x & xs]] | |
(if-not x | |
() | |
(let [v (f x)] | |
(cons v | |
(lazy-seq (append f xs)))))) | |
;;;;;; utilities and wrappers ;;;;; | |
(defn append | |
([coll x] | |
(concat coll [x])) | |
([coll] | |
coll) | |
([]())) | |
(defn map [f coll] | |
(transduce (mapping f) append coll)) | |
(defn filter [f coll] | |
(transduce (filtering f) append coll)) | |
(defn mapcat [f coll] | |
(transduce (mapcatting f) append coll)) | |
(defn take [n coll] | |
(transduce (taking n) append coll)) | |
(defn drop [n coll] | |
(transduce (dropping n) append coll)) | |
(defn take-while [pred coll] | |
(transduce (taking-while pred) append coll)) | |
(defn drop-while [pred coll] | |
(transduce (dropping-while pred) append coll)) | |
(def pipe-ops (comp (dropping 1000000) | |
(taking 50) | |
(filtering odd?) | |
(mapping #(rem % 1000000)) | |
(taking-while #(< % 10)) | |
(dropping-while #(< % 5)) | |
(mapcatting range))) | |
(defn big-pipe [] | |
(transduce | |
pipe-ops | |
append | |
(range))) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment