Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
(ns mutabots
"Reimplementation of transducers, in terms of processing functions instead
of reducing functions.
tl;dr: reducing-fn based transducers are a special case, influenced by reducers,
of processing-fn based transducers.
In Clojure 1.7.0-alpha2, transducers are expressed in terms of the existing
concept of reducing functions.
To sum it up, a transducer has currently the signature :
reducing-fn -> reducing-fn
The following implementation proposes that transducers get the more general
signature of:
processing-fn -> processing-fn
With a processing fn being a generalization of a process consuming inputs,
and cleaning/flushing things at the end (when it receives the signal that there
is no more input).
While the signature of a reducing-fn is ....
(fn
([]) ; <- used to pass init value down the transducers chain.
; Arity sometimes used by the transduce function only.
; (never used by into, sequence, chan etc.)
([acc]) ; <- used when the upstream has no more value.
; Arity used by all functions
; the acc(umulator) value must be passed unchanged down
; the transducers chain for consumption by the final
; reducing-fn (the real and only reducing-fn collecting
; the results).
; the acc argument is only used when a collector reducing-fn
; is required (e.g. only with transduce and into)
([acc input])) ; <- used to process a new input value from upstream.
; as for the 1-arity, the acc(umulator) value must be passed
; unchanged down the transducers chain ...
; Must return a reduced value if it won't accept any additional
; input.
.... the signature of a processing-fn is :
(fn
([]) ; <- used when the upstream has no more value.
([input]) ; <- used to process a new input value from upstream.
; Must return true if it won't accept any additional input,
; false otherwise.
Comparing transducer implementations.
Identity transducer:
(defn rf-identity []
(fn [rf]
(fn [] (rf))
(fn [acc] (rf acc))
(fn [acc input] (rf acc input))))
(defn pf-identity []
(fn [p]
(fn [] (p))
(fn [input] (p input))))
note: simpler arity. It is yet to be proven that the 0-arity form will
ever return something different than (rf) since it's not guaranteed to be
called (all stateful init must then happen at construction), and
that 1-arity and 2-arity can modify acc for a valuable reason in a composable manner.
Partition-all transducer:
;; with current transducers
(defn partition-all [^long n]
(fn [rf]
(let [a (java.util.ArrayList. n)]
(fn
([] (rf))
([result]
(let [result (if (.isEmpty a)
result
(let [v (vec (.toArray a))]
;;clear first!
(.clear a)
(unreduced (rf result v))))]
(rf result)))
([result input]
(.add a input)
(if (= n (.size a))
(let [v (vec (.toArray a))]
(.clear a)
(rf result v))
result))))))
;; with processing-based transducers
(defn partition-all [^long n]
(fn [p]
(let [a (java.util.ArrayList. n)
flush! (fn [] (let [v (vec (.toArray a))]
(.clear a)
(p v)))]
(fn
([]
(when-not (.isEmpty a) (flush!))
(p))
([x]
(.add a x)
(when (= n (.size a)) (flush!)))))))
note: notice the call to #'unreduced in the rf-based version? Subtle bugs
waiting for you.
alsos notice that the pf-based does not need to wrap / unwrap values
with #'reduced => just use truethy/falsy values instead."
(:refer-clojure
:exclude [map filter remove
take take-while take-nth
drop drop-while replace
partition-by partition-all
transduce sequence
keep keep-indexed cycle
dedupe cat mapcat]))
(defn map [f]
(fn [p]
(fn
([] (p))
([x] (p (f x))))))
(defn filter [pred]
(fn [p1]
(fn
([] (p1))
([x] (and (pred x) (p1 x))))))
(defn remove [pred] (filter (complement pred)))
(defn take [n]
(fn [p1]
(let [vn (volatile! (dec n))]
(fn
([] (p1))
([x] (or (neg? @vn) (p1 x) (neg? (vswap! vn dec))))))))
(defn take-while [pred]
(fn [p1]
(fn
([] (p1))
([x] (if (pred x) (p1 x) true)))))
(defn take-nth [n]
(fn [p1]
(let [vn (volatile! n)]
(fn
([] (p1))
([x] (if (== @vn n)
(do (vreset! vn 1) (p1 x))
(do (vswap! vn inc) false)))))))
(defn drop [n]
(fn [p1]
(let [vn (volatile! n)]
(fn
([] (p1))
([x] (if (pos? @vn)
(do (vswap! vn dec) false)
(p1 x)))))))
(defn drop-while [pred]
(fn [p1]
(let [vtake? (volatile! false)
start-take? (complement pred)]
(fn
([] (p1))
([x]
(cond
@vtake? (p1 x)
(start-take? x)
(do
(vreset! vtake? true)
(p1 x))))))))
(defn replace [smap]
(map #(if-let [e (find smap %)] (val e) %)))
(defn keep [f]
(fn [p1]
(fn
([] (p1))
([x] (let [v (f x)] (when-not (nil? v) (p1 x)))))))
(defn keep-indexed [f]
(fn [p1]
(let [vi (volatile! -1)]
(fn
([] (p1))
([x]
(let [i (vswap! vi inc)
v (f i x)]
(when-not (nil? v)
(p1 v))))))))
(defn cycle []
(fn [p1]
(let [xs (java.util.ArrayList.)]
(fn
([]
(let [max (dec (.size xs))]
(loop [i 0]
(when-not (p1 (.get xs i))
(recur (if (< i max) (inc i) 0)))))
(p1))
([x] (.add xs x) (p1 x))))))
(defn partition-by [f]
(fn [p]
(let [a (java.util.ArrayList.)
pv (volatile! ::none)]
(fn
([]
(when-not (.isEmpty a)
(let [v (vec (.toArray a))]
;;clear first!
(.clear a)
(p v)))
(p))
([input]
(let [pval @pv
val (f input)]
(vreset! pv val)
(if (or (identical? pval ::none)
(= val pval))
(do (.add a input) false) ; .add returns true
(let [v (vec (.toArray a))]
(.clear a)
(or (p v)
(do (.add a input) false))))))))))
(defn partition-all [^long n]
(fn [p1]
(let [a (java.util.ArrayList. n)
flush! (fn [] (let [v (vec (.toArray a))]
(.clear a)
(p1 v)))]
(fn
([]
(when-not (.isEmpty a) (flush!))
(p1))
([x]
(.add a x)
(when (= n (.size a)) (flush!)))))))
(defn dedupe []
(fn [p1]
(let [vprev (volatile! (Object.))]
(fn
([] (p1))
([x] (when (not= @vprev x)
(vreset! vprev x)
(p1 x)))))))
(defn cat []
(fn [p1]
(let [rf (fn [_ x] (when (p1 x) (reduced true)))]
(fn
([] (p1))
([c] (reduce rf false c))))))
(defn mapcat [f] (comp (map f) (cat)))
(defn transduce [xform f init coll]
(let [vacc (volatile! init)
p (fn
([] (vswap! vacc f))
([x] (reduced? (vswap! vacc f x)))))
p (xform p)]
(reduce (fn [_ x] (when (p x) (vswap! vacc unreduced) (p) @vacc)) init coll)))
;; bleh :-(
(defn- promised-seq-proc! [pstep! p]
(let [vp (volatile! p)]
(fn
([]
(deliver @vp nil))
([x]
(deliver @vp (cons x (let [p (vreset! vp (promise))]
(lazy-seq (@pstep! p) @p))))
false))))
(defn sequence [xform coll]
(let [vcoll (volatile! coll)
p (promise)
promised-seq (lazy-seq p)
pstep! (promise)
proc! (xform (promised-seq-proc! pstep! p))
step! (fn [p]
(loop [coll @vcoll]
(if (realized? p)
(vreset! vcoll coll)
(if-let [[x :as s] (seq coll)]
(recur (if (proc! x) nil (rest s)))
(proc!)))))]
(deliver pstep! step!) ; tying the knot
(lazy-seq (step! p) @p)))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.