Skip to content

Instantly share code, notes, and snippets.

@laurentpetit
Forked from cgrand/mutabots.clj
Last active May 12, 2021 23:21
Show Gist options
  • Save laurentpetit/403bb61bd69765482e6a to your computer and use it in GitHub Desktop.
Save laurentpetit/403bb61bd69765482e6a to your computer and use it in GitHub Desktop.
Reimplementation of transducers, in terms of processing functions instead of reducing functions. WIP.
(ns mutabots
"Reimplementation of transducers, in terms of processing functions instead
of reducing functions.
tl;dr: reducing-fn based transducers are a special case, influenced by reducers,
of processing-fn based transducers.
In Clojure 1.7.0-alpha2, transducers are expressed in terms of the existing
concept of reducing functions.
To sum it up, a transducer has currently the signature :
reducing-fn -> reducing-fn
The following implementation proposes that transducers get the more general
signature of:
processing-fn -> processing-fn
With a processing fn being a generalization of a process consuming inputs,
and cleaning/flushing things at the end (when it receives the signal that there
is no more input).
While the signature of a reducing-fn is ....
(fn
([]) ; <- used to pass init value down the transducers chain.
; Arity used by the transduce/into functions only
([acc]) ; <- used when the upstream has no more value.
; Arity used by all functions
; the acc(umulator) value must be passed unchanged down
; the transducers chain for consumption by the final
; reducing-fn (the real and only reducing-fn collecting
; the results).
; the acc argument is only used when a collector reducing-fn
; is required (e.g. only with transduce and into)
([acc input])) ; <- used to process a new input value from upstream.
; as for the 1-arity, the acc(umulator) value must be passed
; unchanged down the transducers chain ...
; Must return a reduced value if it won't accept any additional
; input.
.... the signature of a processing-fn is :
(fn
([]) ; <- used when the upstream has no more value.
([input]) ; <- used to process a new input value from upstream.
; Must return true if it won't accept any additional input,
; false otherwise.
Comparing transducer implementations.
Identity transducer:
(defn rf-identity []
(fn [rf]
(fn [] (rf))
(fn [acc] (rf acc))
(fn [acc input] (rf acc input))))
(defn pf-identity []
(fn [p]
(fn [] (p))
(fn [input] (p input))))
note: simpler arity. It is yet to be proven that the 0-arity form will
ever return something different than (rf), and that 1-arity and 2-arity
can modify acc for a valuable reason.
Partition-all transducer:
;; with current transducers
(defn partition-all [^long n]
(fn [rf]
(let [a (java.util.ArrayList. n)]
(fn
([] (rf))
([result]
(let [result (if (.isEmpty a)
result
(let [v (vec (.toArray a))]
;;clear first!
(.clear a)
(unreduced (rf result v))))]
(rf result)))
([result input]
(.add a input)
(if (= n (.size a))
(let [v (vec (.toArray a))]
(.clear a)
(rf result v))
result))))))
;; with processing-based transducers
(defn partition-all [^long n]
(fn [p]
(let [a (java.util.ArrayList. n)
flush! (fn [] (let [v (vec (.toArray a))]
(.clear a)
(p v)))]
(fn
([]
(when-not (.isEmpty a) (flush!))
(p))
([x]
(.add a x)
(when (= n (.size a)) (flush!)))))))
note: notice the call to #'unreduced in the rf-based version? Subtle bugs
waiting for you.
alsos notice that the pf-based does not need to wrap / unwrap values
with #'reduced => just use truethy/falsy values instead."
(:refer-clojure
:exclude [map filter remove
take take-while take-nth
drop drop-while replace
partition-by partition-all
transduce sequence
keep keep-indexed cycle
dedupe cat mapcat]))
(defn map [f]
(fn [p]
(fn
([] (p))
([x] (p (f x))))))
(defn filter [pred]
(fn [p1]
(fn
([] (p1))
([x] (and (pred x) (p1 x))))))
(defn remove [pred] (filter (complement pred)))
(defn take [n]
(fn [p1]
(let [vn (volatile! (dec n))]
(fn
([] (p1))
([x] (or (neg? @vn) (p1 x) (neg? (vswap! vn dec))))))))
(defn take-while [pred]
(fn [p1]
(fn
([] (p1))
([x] (if (pred x) (p1 x) true)))))
(defn take-nth [n]
(fn [p1]
(let [vn (volatile! n)]
(fn
([] (p1))
([x] (if (== @vn n)
(do (vreset! vn 1) (p1 x))
(do (vswap! vn inc) false)))))))
(defn drop [n]
(fn [p1]
(let [vn (volatile! n)]
(fn
([] (p1))
([x] (if (pos? @vn)
(do (vswap! vn dec) false)
(p1 x)))))))
(defn drop-while [pred]
(fn [p1]
(let [vtake? (volatile! false)
start-take? (complement pred)]
(fn
([] (p1))
([x]
(cond
@vtake? (p1 x)
(start-take? x)
(do
(vreset! vtake? true)
(p1 x))))))))
(defn replace [smap]
(map #(if-let [e (find smap %)] (val e) %)))
(defn keep [f]
(fn [p1]
(fn
([] (p1))
([x] (let [v (f x)] (when-not (nil? v) (p1 x)))))))
(defn keep-indexed [f]
(fn [p1]
(let [vi (volatile! -1)]
(fn
([] (p1))
([x]
(let [i (vswap! vi inc)
v (f i x)]
(when-not (nil? v)
(p1 v))))))))
(defn cycle []
(fn [p1]
(let [xs (java.util.ArrayList.)]
(fn
([]
(let [max (dec (.size xs))]
(loop [i 0]
(when-not (p1 (.get xs i))
(recur (if (< i max) (inc i) 0)))))
(p1))
([x] (.add xs x) (p1 x))))))
(defn partition-by [f]
(fn [p]
(let [a (java.util.ArrayList.)
pv (volatile! ::none)]
(fn
([]
(when-not (.isEmpty a)
(let [v (vec (.toArray a))]
;;clear first!
(.clear a)
(p v)))
(p))
([input]
(let [pval @pv
val (f input)]
(vreset! pv val)
(if (or (identical? pval ::none)
(= val pval))
(do (.add a input) false) ; .add returns true
(let [v (vec (.toArray a))]
(.clear a)
(or (p v)
(do (.add a input) false))))))))))
(defn partition-all [^long n]
(fn [p1]
(let [a (java.util.ArrayList. n)
flush! (fn [] (let [v (vec (.toArray a))]
(.clear a)
(p1 v)))]
(fn
([]
(when-not (.isEmpty a) (flush!))
(p1))
([x]
(.add a x)
(when (= n (.size a)) (flush!)))))))
(defn dedupe []
(fn [p1]
(let [vprev (volatile! (Object.))]
(fn
([] (p1))
([x] (when (not= @vprev x)
(vreset! vprev x)
(p1 x)))))))
(defn cat []
(fn [p1]
(let [rf (fn [_ x] (when (p1 x) (reduced true)))]
(fn
([] (p1))
([c] (reduce rf false c))))))
(defn mapcat [f] (comp (map f) (cat)))
(defn transduce [xform f init coll]
(let [vacc (volatile! init)
p (fn
([] (vswap! vacc f))
([x] (reduced? (vreset! vacc (f @vacc x)))))
p (xform p)]
(reduce (fn [_ x] (when (p x) (reduced nil))) ::unused coll)
(p)
@vacc))
;; bleh :-(
(defn- promised-seq-proc! [pstep! p]
(let [vp (volatile! p)]
(fn
([]
(deliver @vp nil))
([x]
(deliver @vp (cons x (let [p (vreset! vp (promise))]
(lazy-seq (@pstep! p) @p))))
false))))
(defn sequence [xform coll]
(let [vcoll (volatile! coll)
p (promise)
promised-seq (lazy-seq p)
pstep! (promise)
proc! (xform (promised-seq-proc! pstep! p))
step! (fn [p]
(loop [coll @vcoll]
(if (realized? p)
(vreset! vcoll coll)
(if-let [[x :as s] (seq coll)]
(recur (if (proc! x) nil (rest s)))
(proc!)))))]
(deliver pstep! step!) ; tying the knot
(lazy-seq (step! p) @p)))
@laurentpetit
Copy link
Author

added remove, take-while, drop-while

@laurentpetit
Copy link
Author

added take-nth

@laurentpetit
Copy link
Author

added replace

@laurentpetit
Copy link
Author

fixed transduce which was swallowing coll initial value (implicitly mistaking it for the accumulator initial value via the call to reduce)

@laurentpetit
Copy link
Author

bug dans transduce: on oubliait d'appeler (p) à la fin

@laurentpetit
Copy link
Author

adding 'cycle

@laurentpetit
Copy link
Author

added drop

@laurentpetit
Copy link
Author

added keep

@laurentpetit
Copy link
Author

debugged keep

@laurentpetit
Copy link
Author

added keep-indexed

@laurentpetit
Copy link
Author

added dedupe

@laurentpetit
Copy link
Author

added cat

@laurentpetit
Copy link
Author

added mapcat

@laurentpetit
Copy link
Author

added partition-all!

@laurentpetit
Copy link
Author

removed unnecessary #'unreduced call

@laurentpetit
Copy link
Author

added namespace documentation

@laurentpetit
Copy link
Author

Note to the readers : WIP !

@mike-thompson-day8
Copy link

detritus on line 85?

@laurentpetit
Copy link
Author

@mike-thompson-day8 fixed, thx!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment