Skip to content

Instantly share code, notes, and snippets.

@ptaoussanis
Forked from laurentpetit/mutabots.clj
Last active August 29, 2015 14:07
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
Star You must be signed in to star a gist
Save ptaoussanis/c93f79ffd54a20a34cdc to your computer and use it in GitHub Desktop.
(ns mutabots
"Reimplementation of transducers, in terms of processing functions instead
of reducing functions.
tl;dr: reducing-fn based transducers are a special case, influenced by reducers,
of processing-fn based transducers.
In Clojure 1.7.0-alpha2, transducers are expressed in terms of the existing
concept of reducing functions.
To sum it up, a transducer has currently the signature :
reducing-fn -> reducing-fn
The following implementation proposes that transducers get the more general
signature of:
processing-fn -> processing-fn
With a processing fn being a generalization of a process consuming inputs,
and cleaning/flushing things at the end (when it receives the signal that there
is no more input).
While the signature of a reducing-fn is ....
(fn
([]) ; <- used to pass init value down the transducers chain.
; Arity used by the transduce/into functions only
([acc]) ; <- used when the upstream has no more value.
; Arity used by all functions
; the acc(umulator) value must be passed unchanged down
; the transducers chain for consumption by the final
; reducing-fn (the real and only reducing-fn collecting
; the results).
; the acc argument is only used when a collector reducing-fn
; is required (e.g. only with transduce and into)
([acc input])) ; <- used to process a new input value from upstream.
; as for the 1-arity, the acc(umulator) value must be passed
; unchanged down the transducers chain ...
; Must return a reduced value if it won't accept any additional
; input.
.... the signature of a processing-fn is :
(fn
([]) ; <- used when the upstream has no more value.
([input]) ; <- used to process a new input value from upstream.
; Must return true if it won't accept any additional input,
; false otherwise.
Comparing transducer implementations.
Identity transducer:
(defn rf-identity []
(fn [rf]
(fn [] (rf))
(fn [acc] (rf acc))
(fn [acc input] (rf acc input))))
(defn pf-identity []
(fn [p]
(fn [] (p))
(fn [input] (p input))))
note: simpler arity. It is yet to be proven that the 0-arity form will
ever return something different than (rf), and that 1-arity and 2-arity
can modify acc for a valuable reason.
Partition-all transducer:
;; with current transducers
(defn partition-all [^long n]
(fn [rf]
(let [a (java.util.ArrayList. n)]
(fn
([] (rf))
([result]
(let [result (if (.isEmpty a)
result
(let [v (vec (.toArray a))]
;;clear first!
(.clear a)
(unreduced (rf result v))))]
(rf result)))
([result input]
(.add a input)
(if (= n (.size a))
(let [v (vec (.toArray a))]
(.clear a)
(rf result v))
result))))))
;; with processing-based transducers
(defn partition-all [^long n]
(fn [p]
(let [a (java.util.ArrayList. n)
flush! (fn [] (let [v (vec (.toArray a))]
(.clear a)
(p v)))]
(fn
([]
(when-not (.isEmpty a) (flush!))
(p))
([x]
(.add a x)
(when (= n (.size a)) (flush!)))))))
note: notice the call to #'unreduced in the rf-based version? Subtle bugs
waiting for you.
alsos notice that the pf-based does not need to wrap / unwrap values
with #'reduced => just use truethy/falsy values instead."
(:refer-clojure
:exclude [map filter remove
take take-while take-nth
drop drop-while replace
partition-by partition-all
transduce sequence
keep keep-indexed cycle
dedupe cat mapcat]))
(defn map [f]
(fn [p]
(fn
([] (p))
([x] (p (f x))))))
(defn filter [pred]
(fn [p1]
(fn
([] (p1))
([x] (and (pred x) (p1 x))))))
(defn remove [pred] (filter (complement pred)))
(defn take [n]
(fn [p1]
(let [vn (volatile! (dec n))]
(fn
([] (p1))
([x] (or (neg? @vn) (p1 x) (neg? (vswap! vn dec))))))))
(defn take-while [pred]
(fn [p1]
(fn
([] (p1))
([x] (if (pred x) (p1 x) true)))))
(defn take-nth [n]
(fn [p1]
(let [vn (volatile! n)]
(fn
([] (p1))
([x] (if (== @vn n)
(do (vreset! vn 1) (p1 x))
(do (vswap! vn inc) false)))))))
(defn drop [n]
(fn [p1]
(let [vn (volatile! n)]
(fn
([] (p1))
([x] (if (pos? @vn)
(do (vswap! vn dec) false)
(p1 x)))))))
(defn drop-while [pred]
(fn [p1]
(let [vtake? (volatile! false)
start-take? (complement pred)]
(fn
([] (p1))
([x]
(cond
@vtake? (p1 x)
(start-take? x)
(do
(vreset! vtake? true)
(p1 x))))))))
(defn replace [smap]
(map #(if-let [e (find smap %)] (val e) %)))
(defn keep [f]
(fn [p1]
(fn
([] (p1))
([x] (let [v (f x)] (when-not (nil? v) (p1 x)))))))
(defn keep-indexed [f]
(fn [p1]
(let [vi (volatile! -1)]
(fn
([] (p1))
([x]
(let [i (vswap! vi inc)
v (f i x)]
(when-not (nil? v)
(p1 v))))))))
(defn cycle []
(fn [p1]
(let [xs (java.util.ArrayList.)]
(fn
([]
(let [max (dec (.size xs))]
(loop [i 0]
(when-not (p1 (.get xs i))
(recur (if (< i max) (inc i) 0)))))
(p1))
([x] (.add xs x) (p1 x))))))
(defn partition-by [f]
(fn [p]
(let [a (java.util.ArrayList.)
pv (volatile! ::none)]
(fn
([]
(when-not (.isEmpty a)
(let [v (vec (.toArray a))]
;;clear first!
(.clear a)
(p v)))
(p))
([input]
(let [pval @pv
val (f input)]
(vreset! pv val)
(if (or (identical? pval ::none)
(= val pval))
(do (.add a input) false) ; .add returns true
(let [v (vec (.toArray a))]
(.clear a)
(or (p v)
(do (.add a input) false))))))))))
(defn partition-all [^long n]
(fn [p1]
(let [a (java.util.ArrayList. n)
flush! (fn [] (let [v (vec (.toArray a))]
(.clear a)
(p1 v)))]
(fn
([]
(when-not (.isEmpty a) (flush!))
(p1))
([x]
(.add a x)
(when (= n (.size a)) (flush!)))))))
(defn dedupe []
(fn [p1]
(let [vprev (volatile! (Object.))]
(fn
([] (p1))
([x] (when (not= @vprev x)
(vreset! vprev x)
(p1 x)))))))
(defn cat []
(fn [p1]
(let [rf (fn [_ x] (when (p1 x) (reduced true)))]
(fn
([] (p1))
([c] (reduce rf false c))))))
(defn mapcat [f] (comp (map f) (cat)))
(defn transduce [xform f init coll]
(let [vacc (volatile! init)
p (fn
([] (vswap! vacc f))
([x] (reduced? (vreset! vacc (f @vacc x)))))
p (xform p)]
(reduce (fn [_ x] (when (p x) (reduced nil))) ::unused coll)
(p)
@vacc))
;; bleh :-(
(defn- promised-seq-proc! [pstep! p]
(let [vp (volatile! p)]
(fn
([]
(deliver @vp nil))
([x]
(deliver @vp (cons x (let [p (vreset! vp (promise))]
(lazy-seq (@pstep! p) @p))))
false))))
(defn sequence [xform coll]
(let [vcoll (volatile! coll)
p (promise)
promised-seq (lazy-seq p)
pstep! (promise)
proc! (xform (promised-seq-proc! pstep! p))
step! (fn [p]
(loop [coll @vcoll]
(if (realized? p)
(vreset! vcoll coll)
(if-let [[x :as s] (seq coll)]
(recur (if (proc! x) nil (rest s)))
(proc!)))))]
(deliver pstep! step!) ; tying the knot
(lazy-seq (step! p) @p)))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment