Why sequence + mapcat silently turns your streaming pipeline into an eager buffer, and how IReduceInit fixes it
In the previous article, we built an elegant transducer pipeline for streaming large files: a with-open-xf transducer managing resource lifecycle inside the chain, mapcat expanding one file into N rows, and transduce consuming everything in O(1) memory. It works perfectly for a single source.
Now we need to merge two sources in lockstep — row 1 with row 1, row 2 with row 2 — before processing.
If you've only ever used (sequence xf coll), you might not know that sequence accepts multiple collections:
(sequence (map vector) [:a :b :c] [1 2 3])
;; => ([:a 1] [:b 2] [:c 3])It walks the collections in lockstep, applying the transducer to corresponding elements — essentially a transducer-powered map over multiple inputs. This is a lesser-known arity that turns out to be exactly what we need.
With our open-rows eduction from Article 1, the merge becomes a one-liner:
(sequence (map merge)
(open-rows file-a) ;; eduction: file-path → stream of row maps
(open-rows file-b)) ;; eduction: file-path → stream of row mapssequence zips the two eductions pairwise, (map merge) combines each pair of row maps into one, and the result is a lazy sequence ready to be consumed by transduce. Same open-rows, same transducer chain, no glue code. Elegant.
It works in unit tests. It works in development. It works in production with small files.
It throws OutOfMemoryError on large files — 80,000 rows on a 1.5 GB heap. The same open-rows eduction that was perfectly O(1) with transduce explodes when consumed by sequence.
This article explains why, and presents a fix using IReduceInit and explicit iterators that keeps O(1) memory for both single-source and multi-source pipelines.
The root cause is a fundamental asymmetry between two ways of consuming a transducer pipeline.
When you call reduce on an Eduction, it calls .reduce(f, init), which internally runs transduce. The source drives the iteration — it pushes each element through the transducer chain.
When mapcat expands one element into N, each sub-element is forwarded immediately to the downstream reducing function. No buffering. The sub-element is produced, forwarded, and eligible for GC before the next one is created. O(1) memory.
When you call reduce on the result of sequence, you get a lazy ISeq. reduce uses seq-reduce — a first/next loop. The consumer drives the iteration, pulling one element at a time.
Under the hood, the lazy seq is backed by a TransformerIterator. When it needs an element, it calls step(), which pulls one source element through the full transducer chain. When mapcat expands it into N elements, the TransformerIterator's internal reducing function does buffer.add(o) for each one. All N elements land in a LinkedList before the first is returned. O(N) memory.
Merging two sources in lockstep requires pulling one element at a time from each source — that's inherently pull-based. But pull-based consumption of an eduction containing mapcat creates a TransformerIterator that eagerly buffers all expanded elements.
Pull is needed for two sources. Pull + mapcat = OOM.
Same reduce, same transducer, same source — only sequence vs eduction:
;; eduction (push-based): each element flows through immediately, no buffer
(time (reduce + 0 (eduction (mapcat range) [10000000])))
;; "Elapsed time: 143.xxx msecs"
;; => 49999995000000
;; sequence (pull-based): buffers all 10M Longs in a LinkedList first
(time (reduce + 0 (sequence (mapcat range) [10000000])))
;; "Elapsed time: 944.xxx msecs"
;; => 49999995000000Same result, ~7x slower with sequence. The difference is pure allocation overhead: 10 million Long objects and 10 million LinkedList nodes materialized in the TransformerIterator buffer before the reduction even starts.
The timing shows the cost. To show the crash, constrain the heap:
# eduction (push-based): processes 10M elements in constant memory
$ clj -J-Xmx256m -M -e '(reduce + 0 (eduction (mapcat range) [10000000]))'
49999995000000
# sequence (pull-based): buffers 10M elements in LinkedList → OOM
$ clj -J-Xmx256m -M -e '(reduce + 0 (sequence (mapcat range) [10000000]))'
Execution error (OutOfMemoryError) at java.util.LinkedList/linkLast (LinkedList.java:154).
Java heap spaceSame transducer. Same data. Same reduction. 256 MB is more than enough to sum 10 million numbers — unless you materialize all of them in a LinkedList first.
To understand why pull-based consumption buffers everything, let's look at the Clojure source.
clojure.core/sequence creates a TransformerIterator and wraps it in a chunked lazy seq. The TransformerIterator constructor (source) initializes the transducer chain with a reducing function that adds every output to an internal buffer:
// TransformerIterator constructor (simplified)
this.xf = (IFn) xform.invoke(new AFn() {
public Object invoke(Object acc, Object o) {
buffer = buffer.add(o); // every transformed element goes here
return acc;
}
});The step() method pulls one source element and runs it through the full chain:
private boolean step() {
while(next == NONE) {
if(buffer != EMPTY) {
next = buffer.first(); // return first buffered element
buffer = buffer.rest();
return true;
}
if(sourceIter.hasNext()) {
xf.invoke(null, sourceIter.next()); // apply full xf to ONE source element
// all outputs land in buffer via buffer.add(o)
} else {
completed = true;
}
}
return true;
}For 1-to-1 transducers (map): one input produces one buffer entry. Fine.
For 1-to-N transducers (mapcat): one input produces N buffer entries. All N elements materialized before the first is returned. The buffer is a LinkedList that grows without bound.
The critical point: any pull-based consumption of an eduction containing mapcat triggers this — whether via sequence, .iterator(), seq, or first. They all go through TransformerIterator.
In our pipeline, the source is [file-path] — one element. mapcat rows->maps expands it into 80,000 row maps. All 80,000 maps land in the LinkedList at once. Now imagine each map has 40 fields (~4 KB): that's ~320 MB materialized before the first row is processed.
The fix addresses the root cause: remove mapcat from the transducer chain so that pull-based consumption is safe, then use IReduceInit to keep push-based reduction for the single-source path.
;; 1 input → N outputs via mapcat
(eduction
(comp (with-open-xf open-stream)
(with-open-xf io/reader)
(map parse-file)
(mapcat rows->maps)) ;; ← 1-to-N: dangerous with pull
[file-path])(defn open-rows [file-path]
(let [reader (open-reader file-path) ;; resource opened OUTSIDE the xf chain
file-data (parse-file reader)
header (:header file-data)
xf (map (partial zipmap (map keyword header)))
rows (:rows file-data)]
(reify
clojure.lang.IReduceInit
(reduce [_ f init]
;; Push-based: transduce drives the iteration, O(1) memory
(transduce xf (completing f) init rows))
Iterable
(iterator [_]
;; Pull-based but 1-to-1: safe for lockstep merging
(.iterator ^Iterable (eduction xf rows)))
java.io.Closeable
(close [_] (.close reader)))))Three changes, one reify:
- No more
mapcat: the source is nowrows(the file's lines), not[file-path]. The transducerxfis purely 1-to-1 (map). Even if aTransformerIteratoris created (via theIterablepath), it buffers at most one element. - No more
with-open-xf: resource management moves from inside the transducer chain towith-openat the call site. The reader is opened before thereifyand closed viaCloseable. IReduceInitinstead ofSeqable: for the single-source path,transducecalls.reduce()directly — fully push-based, no lazy sequence, noTransformerIterator.Iterableis there for the merge path, which needs to pull from two sources in lockstep.
The merge function walks two Iterable sources via Java iterators in lockstep, returning an IReduceInit:
(defn merge-rows [iterable-a iterable-b]
(reify clojure.lang.IReduceInit
(reduce [_ f init]
(let [it-a (.iterator ^Iterable iterable-a)
it-b (.iterator ^Iterable iterable-b)]
(loop [acc init]
(if (reduced? acc)
@acc
(let [has-a (.hasNext it-a)
has-b (.hasNext it-b)]
(if (and (not has-a) (not has-b))
acc
(let [val-a (when has-a (.next it-a))
val-b (when has-b (.next it-b))]
(recur (f acc (merge val-a val-b))))))))))))No sequence. No TransformerIterator for the merge itself. No lazy sequence. Each pair of rows is pulled from the iterators (safe — 1-to-1 transducers), merged, and forwarded to the reducing function. Once processed, nothing references the row — it becomes eligible for GC immediately.
(defn with-rows [file-path-a file-path-b f]
(if-not file-path-b
;; Single source: IReduceInit passed directly to f
(with-open [rows (open-rows file-path-a)]
(f rows))
;; Merge: two Iterables merged into IReduceInit
(with-open [rows-a (open-rows file-path-a)
rows-b (open-rows file-path-b)]
(f (merge-rows rows-a rows-b)))))The callback f receives an IReduceInit. Whether it came from a single source or a merged pair, the caller doesn't need to know — it just calls transduce:
(defn count-rf
"A reducing function that counts processed items — O(1), retains nothing."
([] 0)
([result _input] (inc result))
([result] result))
(with-rows file-path-a file-path-b
(fn [rows]
(transduce
(comp (partition-all 2000)
(map process-batch!))
count-rf
rows)))rows is an IReduceInit. transduce calls .reduce() on it, which is push-based all the way down — whether it's a single-source reduction or the merge-rows iterator loop. No lazy sequences anywhere in the pipeline. The callback is completely decoupled from the source topology.
Without mapcat, a fully pull-based approach using Seqable + sequence would also work:
;; Simpler, but fragile
(reify
clojure.lang.Seqable
(seq [_] (sequence xf rows))
java.io.Closeable
(close [_] (.close reader)))This is simpler code — one protocol instead of three. And it works, because the transducers are now 1-to-1 so the TransformerIterator buffer holds at most one element.
The tradeoff is head retention. Lazy sequences can be silently held in memory if any binding references the head. If someone writes (let [rows (seq src)] ...) and rows stays in scope while the sequence is consumed, every processed element is retained. With IReduceInit, this is structurally impossible — there is no lazy sequence whose head could be held.
IReduceInit is slightly more complex locally but guarantees O(1) memory globally, regardless of how the caller uses it.
Another approach is to keep both sources push-based by running each in its own thread, pushing to a core.async channel:
;; Each source pushes to a channel via transduce
(let [ch-a (async/chan 32)
ch-b (async/chan 32)]
(async/thread (transduce xf (fn [_ row] (async/>!! ch-a row)) nil source-a))
(async/thread (transduce xf (fn [_ row] (async/>!! ch-b row)) nil source-b))
;; Merger reads from both channels in lockstep
(loop [acc init]
(let [row-a (async/<!! ch-a)
row-b (async/<!! ch-b)]
(recur (f acc (merge row-a row-b))))))This avoids TransformerIterator entirely — both sources stay push-based. The merge pulls from bounded channels instead of iterators.
But it adds significant complexity:
- Dependency:
core.asyncmust be on the classpath. - Threads: two extra threads per ingestion, with their lifecycle to manage.
- Error propagation: exceptions in a source thread must be caught, forwarded to the merger, and the channels closed. This is non-trivial to get right.
- Flow control: the bounded channel buffer is just another parameter to tune — too small and you block producers, too large and you buffer memory.
This is overkill. Removing mapcat from the transducer chain solves the root cause without concurrency. The channels approach would make sense if the sources had to use expanding transducers — but they don't.
Performance benchmarks at 80,000 rows with production heap constraints (-Xmx1536m -Xms1536m -XX:+UseG1GC):
| Scenario | Before | After | Memory change |
|---|---|---|---|
| Multi-source merge | 422 MB delta, 494 rows/s | 403 MB delta, 498 rows/s | -5% |
| Single source | 139 MB delta, 592 rows/s | 95 MB delta, 602 rows/s | -32% |
Throughput is unchanged. Memory consumption decreased on both paths. The multi-source path that was crashing with OutOfMemoryError now uses only 26% of the available heap.
The key difference is in the complexity class. The old architecture had O(N) memory: doubling the file size doubled the memory footprint. At 200K rows, the buffers alone would exceed 1.2 GB. At 500K rows, even a 4 GB heap wouldn't suffice.
The new architecture has O(1) memory. The same heap handles 80K, 200K, or 500K rows — only the processing time scales linearly.
-
Merging two sources requires pull-based consumption. You can't drive two
reducecalls in lockstep without pulling from at least one source. This forces you throughTransformerIteratorterritory. -
TransformerIterator+ expanding transducers (mapcat,cat) = O(N) buffering. The buffer materializes all expanded elements in aLinkedListbefore returning the first. This is by design — it's how the pull-based iterator adapts push-based transducers. -
The fix: make transducers 1-to-1. Remove
mapcatby changing the source from[file-path]to the lines themselves. With only 1-to-1 transducers, theTransformerIteratorbuffer holds one element. Pull becomes safe. -
IReduceInit+Iterablegives you control over push vs pull per code path. Single-source:IReduceInitfor push-based reduction. Multi-source merge:Iterablefor safe pull via iterators. Onereify, two consumption modes, O(1) memory on both. -
eductionandsequenceare not interchangeable. They look similar — both take a transducer and a collection. Butreduceon an eduction is push-based (calls.reduce()), whilereduceon a sequence result is pull-based (callsfirst/next). The same transducer chain can be O(1) or O(N) depending on which one you use. -
Always test with production-sized data under constrained heap. This bug never appeared with 100-row test files or an 8 GB development heap. A simple
clj -J-Xmx256mon your test runner would have caught it immediately. -
Read the Clojure source.
TransformerIterator.javais only ~150 lines. Understandingstep()and the buffer mechanism demystifiessequenceentirely.
Tested with Clojure 1.11 on OpenJDK 21, G1GC, 1.5 GB heap. All code examples are simplified for clarity — the production codebase uses additional transducers for field normalization, encryption, and database batching, but the core architecture is as described.