Skip to content

Instantly share code, notes, and snippets.

Last active October 1, 2018 13:58
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
Star You must be signed in to star a gist
What would you like to do?
An implemementation of the transducer API from scratch, as a learning aid.
(ns scratch.transducer
(:refer-clojure :exclude [comp map filter mapcat take drop
take-while drop-while
;; sequence
;; The traditional 2-arity function passed to clojure.core/reduce,
;; which takes the accumulator and the element being processed, and yields
;; the accumulator for the next step. When all items have been processed,
;; the accumulator is the result. Any 2-arity fn, including
;; all arithmetic fns, conj, cons, concat etc., can be a step function in a reducing
;; context, e.g. (reduce #(/ % %2) [1000 5 2 4 5 5]) ;; => 1
;; A step function with 1- and 0-arity versions added. The 0-arity provides
;; an initial value, and the 1-arity yields the result (after all elements
;; have been processed).
;; The reducing function is the processor fn for the pipeline, and is usually
;; the result of nesting possibly many reducing functions, stitched together by
;; the transducer middleware.
;; Examples: +, *, conj (but not e.g. / or cons, unless adding custom arities)
;; A transducer is a function taking a reducing function (RF, or rf) F and returning
;; another reducing function G - in other words, transducers are middleware
;; for reducing functions.
;; When invoked, G invokes F after doing its own work - usually invoking
;; the next rf in the pipeline, which could also be the last, e.g. conj
;; for a sequential output coll.
;; Transducers are thus composable and the glue of the processing pipeline.
;; They enable decoupling of processing from both input data and output format.
;; Each element goes through the entire pipeline operation at once (an added
;; performance bonus).
;; Transducers are created by transducer generating functions (factories)
;; such as #map, #filter et all.
;; In order to be API compliant a transducer must return a full, 3-arities
;; reducing function, or exceptions will occur when the transducing context
;; tries to invoke the missing arities.
;; The transducer factory fn usually takes a 1-arity function argument,
;; which is invoked on the current element within the step function.
;; A transducing context puts together a transducer and input
;; and output methods (streams, collections, channels etc.)
;; Examples are transduce, sequence, 3-arity into, eduction.
(defn comp [f & fs]
(if (not fs)
(fn [& xs]
(apply f xs))
(fn [ & xs]
(f (apply (apply comp fs) xs)))))
(defn reductions
([f init coll]
(reduce (fn [accs x]
(conj accs
(f (last accs) x)))
([f coll]
(reductions f (first coll) (rest coll))))
(defn rf-addenda [rf-sym]
`[([~'acc] (~rf-sym ~'acc))
([] (~rf-sym))])
;; (as-rf ([acc x] (rf acc (f x))) rf)
;; => (fn ([acc x] ...) ([acc] (my-rf acc)) ([] (my-rf))
(defmacro as-rf [step-fn-args+body rf-sym]
;; ~@(rf-addenda rf-sym)
([~'acc] (~rf-sym ~'acc))
([] (~rf-sym))))
;; Functions yielding transducers
(defn mapping [f]
(fn [rf]
(as-rf ([acc x] (rf acc (f x))) rf)))
(defn filtering [pred]
(fn [rf]
([acc x]
(if (pred x)
(rf acc x)
acc)) rf)))
(defn removing [pred]
(filtering (complement pred)))
(defn mapcatting [f]
(fn [rf]
([acc x]
(reduce rf acc (f x)))
;; utility for count-down state
(defn done-pred [steps]
(let [ctr (atom (dec steps))]
(fn []
(let [n @ctr]
(if (neg? n)
(do (swap! ctr dec)
;; utility for predicate based single on to off, state.
(defn on->off-pred [pred]
(let [on? (atom true)]
(fn [v] (and @on?
(let [onn? (pred v)]
(when-not onn?
(reset! on? false))
;; Functions yielding stateful transducers
(defn taking [n]
(fn [rf]
(let [done? (done-pred n)]
([acc x]
(if (done?)
(reduced acc)
(rf acc x)))
(defn taking-while [pred]
(fn [rf]
(let [on? (on->off-pred pred)]
([acc x]
(if (on? x)
(rf acc x)
(reduced acc)))
(defn dropping [n]
(fn [rf]
(let [started? (done-pred n)]
([acc x]
(if (started?)
(rf acc x)
(defn dropping-while [pred]
(fn [rf]
(let [dropping? (on->off-pred pred)]
([acc x]
(if (dropping? x)
(rf acc x)))
(defn distinct []
(fn [rf]
(let [xs (atom #{})]
([acc x]
(if-not (@xs x)
(do (swap! xs conj x)
(rf acc x))
(defn dedupe []
(fn [rf]
(let [last (atom nil)]
([acc x]
(if-not (= @last x)
(do (reset! last x)
(rf acc x))
;; Function which puts together transducers, input and output
;; xf is the transducer used (or composition of),
;; f is a reducing function aggregating the output
;; if provided (default is invoking 0-arity f), init is the initial output value
(defn transduce
([xf f init coll]
(reduce (xf f) init coll))
([xf f coll]
(transduce xf f (f) coll)))
(defn map-reduced [f coll]
(if (empty? coll)
(let [[x & xs] coll
v (f x)]
(cons v
(if (reduced? v)
(lazy-seq (map-reduced f xs)))))))
#_(defn append [f [x & xs]]
(if-not x
(let [v (f x)]
(cons v
(lazy-seq (append f xs))))))
;;;;;; utilities and wrappers ;;;;;
(defn append
([coll x]
(concat coll [x]))
(defn map [f coll]
(transduce (mapping f) append coll))
(defn filter [f coll]
(transduce (filtering f) append coll))
(defn mapcat [f coll]
(transduce (mapcatting f) append coll))
(defn take [n coll]
(transduce (taking n) append coll))
(defn drop [n coll]
(transduce (dropping n) append coll))
(defn take-while [pred coll]
(transduce (taking-while pred) append coll))
(defn drop-while [pred coll]
(transduce (dropping-while pred) append coll))
(def pipe-ops (comp (dropping 1000000)
(taking 50)
(filtering odd?)
(mapping #(rem % 1000000))
(taking-while #(< % 10))
(dropping-while #(< % 5))
(mapcatting range)))
(defn big-pipe []
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment