Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Reimplementation of transducers, in terms of processing functions instead of reducing functions. WIP.
(ns mutabots
"Reimplementation of transducers, in terms of processing functions instead
of reducing functions.
tl;dr: reducing-fn based transducers are a special case, influenced by reducers,
of processing-fn based transducers.
In Clojure 1.7.0-alpha2, transducers are expressed in terms of the existing
concept of reducing functions.
To sum it up, a transducer has currently the signature :
reducing-fn -> reducing-fn
The following implementation proposes that transducers get the more general
signature of:
processing-fn -> processing-fn
With a processing fn being a generalization of a process consuming inputs,
and cleaning/flushing things at the end (when it receives the signal that there
is no more input).
While the signature of a reducing-fn is ....
(fn
([]) ; <- used to pass init value down the transducers chain.
; Arity used by the transduce/into functions only
([acc]) ; <- used when the upstream has no more value.
; Arity used by all functions
; the acc(umulator) value must be passed unchanged down
; the transducers chain for consumption by the final
; reducing-fn (the real and only reducing-fn collecting
; the results).
; the acc argument is only used when a collector reducing-fn
; is required (e.g. only with transduce and into)
([acc input])) ; <- used to process a new input value from upstream.
; as for the 1-arity, the acc(umulator) value must be passed
; unchanged down the transducers chain ...
; Must return a reduced value if it won't accept any additional
; input.
.... the signature of a processing-fn is :
(fn
([]) ; <- used when the upstream has no more value.
([input]) ; <- used to process a new input value from upstream.
; Must return true if it won't accept any additional input,
; false otherwise.
Comparing transducer implementations.
Identity transducer:
(defn rf-identity []
(fn [rf]
(fn [] (rf))
(fn [acc] (rf acc))
(fn [acc input] (rf acc input))))
(defn pf-identity []
(fn [p]
(fn [] (p))
(fn [input] (p input))))
note: simpler arity. It is yet to be proven that the 0-arity form will
ever return something different than (rf), and that 1-arity and 2-arity
can modify acc for a valuable reason.
Partition-all transducer:
;; with current transducers
(defn partition-all [^long n]
(fn [rf]
(let [a (java.util.ArrayList. n)]
(fn
([] (rf))
([result]
(let [result (if (.isEmpty a)
result
(let [v (vec (.toArray a))]
;;clear first!
(.clear a)
(unreduced (rf result v))))]
(rf result)))
([result input]
(.add a input)
(if (= n (.size a))
(let [v (vec (.toArray a))]
(.clear a)
(rf result v))
result))))))
;; with processing-based transducers
(defn partition-all [^long n]
(fn [p]
(let [a (java.util.ArrayList. n)
flush! (fn [] (let [v (vec (.toArray a))]
(.clear a)
(p v)))]
(fn
([]
(when-not (.isEmpty a) (flush!))
(p))
([x]
(.add a x)
(when (= n (.size a)) (flush!)))))))
note: notice the call to #'unreduced in the rf-based version? Subtle bugs
waiting for you.
alsos notice that the pf-based does not need to wrap / unwrap values
with #'reduced => just use truethy/falsy values instead."
(:refer-clojure
:exclude [map filter remove
take take-while take-nth
drop drop-while replace
partition-by partition-all
transduce sequence
keep keep-indexed cycle
dedupe cat mapcat]))
(defn map [f]
(fn [p]
(fn
([] (p))
([x] (p (f x))))))
(defn filter [pred]
(fn [p1]
(fn
([] (p1))
([x] (and (pred x) (p1 x))))))
(defn remove [pred] (filter (complement pred)))
(defn take [n]
(fn [p1]
(let [vn (volatile! (dec n))]
(fn
([] (p1))
([x] (or (neg? @vn) (p1 x) (neg? (vswap! vn dec))))))))
(defn take-while [pred]
(fn [p1]
(fn
([] (p1))
([x] (if (pred x) (p1 x) true)))))
(defn take-nth [n]
(fn [p1]
(let [vn (volatile! n)]
(fn
([] (p1))
([x] (if (== @vn n)
(do (vreset! vn 1) (p1 x))
(do (vswap! vn inc) false)))))))
(defn drop [n]
(fn [p1]
(let [vn (volatile! n)]
(fn
([] (p1))
([x] (if (pos? @vn)
(do (vswap! vn dec) false)
(p1 x)))))))
(defn drop-while [pred]
(fn [p1]
(let [vtake? (volatile! false)
start-take? (complement pred)]
(fn
([] (p1))
([x]
(cond
@vtake? (p1 x)
(start-take? x)
(do
(vreset! vtake? true)
(p1 x))))))))
(defn replace [smap]
(map #(if-let [e (find smap %)] (val e) %)))
(defn keep [f]
(fn [p1]
(fn
([] (p1))
([x] (let [v (f x)] (when-not (nil? v) (p1 x)))))))
(defn keep-indexed [f]
(fn [p1]
(let [vi (volatile! -1)]
(fn
([] (p1))
([x]
(let [i (vswap! vi inc)
v (f i x)]
(when-not (nil? v)
(p1 v))))))))
(defn cycle []
(fn [p1]
(let [xs (java.util.ArrayList.)]
(fn
([]
(let [max (dec (.size xs))]
(loop [i 0]
(when-not (p1 (.get xs i))
(recur (if (< i max) (inc i) 0)))))
(p1))
([x] (.add xs x) (p1 x))))))
(defn partition-by [f]
(fn [p]
(let [a (java.util.ArrayList.)
pv (volatile! ::none)]
(fn
([]
(when-not (.isEmpty a)
(let [v (vec (.toArray a))]
;;clear first!
(.clear a)
(p v)))
(p))
([input]
(let [pval @pv
val (f input)]
(vreset! pv val)
(if (or (identical? pval ::none)
(= val pval))
(do (.add a input) false) ; .add returns true
(let [v (vec (.toArray a))]
(.clear a)
(or (p v)
(do (.add a input) false))))))))))
(defn partition-all [^long n]
(fn [p1]
(let [a (java.util.ArrayList. n)
flush! (fn [] (let [v (vec (.toArray a))]
(.clear a)
(p1 v)))]
(fn
([]
(when-not (.isEmpty a) (flush!))
(p1))
([x]
(.add a x)
(when (= n (.size a)) (flush!)))))))
(defn dedupe []
(fn [p1]
(let [vprev (volatile! (Object.))]
(fn
([] (p1))
([x] (when (not= @vprev x)
(vreset! vprev x)
(p1 x)))))))
(defn cat []
(fn [p1]
(let [rf (fn [_ x] (when (p1 x) (reduced true)))]
(fn
([] (p1))
([c] (reduce rf false c))))))
(defn mapcat [f] (comp (map f) (cat)))
(defn transduce [xform f init coll]
(let [vacc (volatile! init)
p (fn
([] (vswap! vacc f))
([x] (reduced? (vreset! vacc (f @vacc x)))))
p (xform p)]
(reduce (fn [_ x] (when (p x) (reduced nil))) ::unused coll)
(p)
@vacc))
;; bleh :-(
(defn- promised-seq-proc! [pstep! p]
(let [vp (volatile! p)]
(fn
([]
(deliver @vp nil))
([x]
(deliver @vp (cons x (let [p (vreset! vp (promise))]
(lazy-seq (@pstep! p) @p))))
false))))
(defn sequence [xform coll]
(let [vcoll (volatile! coll)
p (promise)
promised-seq (lazy-seq p)
pstep! (promise)
proc! (xform (promised-seq-proc! pstep! p))
step! (fn [p]
(loop [coll @vcoll]
(if (realized? p)
(vreset! vcoll coll)
(if-let [[x :as s] (seq coll)]
(recur (if (proc! x) nil (rest s)))
(proc!)))))]
(deliver pstep! step!) ; tying the knot
(lazy-seq (step! p) @p)))
@laurentpetit

This comment has been minimized.

Copy link
Owner Author

commented Oct 11, 2014

added remove, take-while, drop-while

@laurentpetit

This comment has been minimized.

Copy link
Owner Author

commented Oct 11, 2014

added take-nth

@laurentpetit

This comment has been minimized.

Copy link
Owner Author

commented Oct 11, 2014

added replace

@laurentpetit

This comment has been minimized.

Copy link
Owner Author

commented Oct 11, 2014

fixed transduce which was swallowing coll initial value (implicitly mistaking it for the accumulator initial value via the call to reduce)

@laurentpetit

This comment has been minimized.

Copy link
Owner Author

commented Oct 11, 2014

bug dans transduce: on oubliait d'appeler (p) à la fin

@laurentpetit

This comment has been minimized.

Copy link
Owner Author

commented Oct 11, 2014

adding 'cycle

@laurentpetit

This comment has been minimized.

Copy link
Owner Author

commented Oct 11, 2014

added drop

@laurentpetit

This comment has been minimized.

Copy link
Owner Author

commented Oct 11, 2014

added keep

@laurentpetit

This comment has been minimized.

Copy link
Owner Author

commented Oct 11, 2014

debugged keep

@laurentpetit

This comment has been minimized.

Copy link
Owner Author

commented Oct 11, 2014

added keep-indexed

@laurentpetit

This comment has been minimized.

Copy link
Owner Author

commented Oct 11, 2014

added dedupe

@laurentpetit

This comment has been minimized.

Copy link
Owner Author

commented Oct 11, 2014

added cat

@laurentpetit

This comment has been minimized.

Copy link
Owner Author

commented Oct 11, 2014

added mapcat

@laurentpetit

This comment has been minimized.

Copy link
Owner Author

commented Oct 12, 2014

added partition-all!

@laurentpetit

This comment has been minimized.

Copy link
Owner Author

commented Oct 12, 2014

removed unnecessary #'unreduced call

@laurentpetit

This comment has been minimized.

Copy link
Owner Author

commented Oct 12, 2014

added namespace documentation

@laurentpetit

This comment has been minimized.

Copy link
Owner Author

commented Oct 12, 2014

Note to the readers : WIP !

@mike-thompson-day8

This comment has been minimized.

Copy link

commented Oct 12, 2014

detritus on line 85?

@laurentpetit

This comment has been minimized.

Copy link
Owner Author

commented Oct 13, 2014

@mike-thompson-day8 fixed, thx!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.