Skip to content

Instantly share code, notes, and snippets.

@NicolasLambert
Last active February 9, 2026 22:34
Show Gist options
  • Select an option

  • Save NicolasLambert/3fd0eed31cf5a256740110652bb68a23 to your computer and use it in GitHub Desktop.

Select an option

Save NicolasLambert/3fd0eed31cf5a256740110652bb68a23 to your computer and use it in GitHub Desktop.

Merging Two Streaming Sources in Clojure Without Blowing Up Memory

Why sequence + mapcat silently turns your streaming pipeline into an eager buffer, and how IReduceInit fixes it


Introduction

In the previous article, we built an elegant transducer pipeline for streaming large files: a with-open-xf transducer managing resource lifecycle inside the chain, mapcat expanding one file into N rows, and transduce consuming everything in O(1) memory. It works perfectly for a single source.

Now we need to merge two sources in lockstep — row 1 with row 1, row 2 with row 2 — before processing.

The Multi-Arity sequence Trick

If you've only ever used (sequence xf coll), you might not know that sequence accepts multiple collections:

(sequence (map vector) [:a :b :c] [1 2 3])
;; => ([:a 1] [:b 2] [:c 3])

It walks the collections in lockstep, applying the transducer to corresponding elements — essentially a transducer-powered map over multiple inputs. This is a lesser-known arity that turns out to be exactly what we need.

With our open-rows eduction from Article 1, the merge becomes a one-liner:

(sequence (map merge)
          (open-rows file-a)    ;; eduction: file-path → stream of row maps
          (open-rows file-b))   ;; eduction: file-path → stream of row maps

sequence zips the two eductions pairwise, (map merge) combines each pair of row maps into one, and the result is a lazy sequence ready to be consumed by transduce. Same open-rows, same transducer chain, no glue code. Elegant.

It works in unit tests. It works in development. It works in production with small files.

It throws OutOfMemoryError on large files — 80,000 rows on a 1.5 GB heap. The same open-rows eduction that was perfectly O(1) with transduce explodes when consumed by sequence.

This article explains why, and presents a fix using IReduceInit and explicit iterators that keeps O(1) memory for both single-source and multi-source pipelines.


The Fundamental Tension: Push vs Pull

The root cause is a fundamental asymmetry between two ways of consuming a transducer pipeline.

Push-based: eduction + reduce

When you call reduce on an Eduction, it calls .reduce(f, init), which internally runs transduce. The source drives the iteration — it pushes each element through the transducer chain.

When mapcat expands one element into N, each sub-element is forwarded immediately to the downstream reducing function. No buffering. The sub-element is produced, forwarded, and eligible for GC before the next one is created. O(1) memory.

Pull-based: sequence + reduce

When you call reduce on the result of sequence, you get a lazy ISeq. reduce uses seq-reduce — a first/next loop. The consumer drives the iteration, pulling one element at a time.

Under the hood, the lazy seq is backed by a TransformerIterator. When it needs an element, it calls step(), which pulls one source element through the full transducer chain. When mapcat expands it into N elements, the TransformerIterator's internal reducing function does buffer.add(o) for each one. All N elements land in a LinkedList before the first is returned. O(N) memory.

The dilemma

Merging two sources in lockstep requires pulling one element at a time from each source — that's inherently pull-based. But pull-based consumption of an eduction containing mapcat creates a TransformerIterator that eagerly buffers all expanded elements.

Pull is needed for two sources. Pull + mapcat = OOM.

See it in the REPL

Same reduce, same transducer, same source — only sequence vs eduction:

;; eduction (push-based): each element flows through immediately, no buffer
(time (reduce + 0 (eduction (mapcat range) [10000000])))
;; "Elapsed time: 143.xxx msecs"
;; => 49999995000000

;; sequence (pull-based): buffers all 10M Longs in a LinkedList first
(time (reduce + 0 (sequence (mapcat range) [10000000])))
;; "Elapsed time: 944.xxx msecs"
;; => 49999995000000

Same result, ~7x slower with sequence. The difference is pure allocation overhead: 10 million Long objects and 10 million LinkedList nodes materialized in the TransformerIterator buffer before the reduction even starts.

Prove it with a constrained heap

The timing shows the cost. To show the crash, constrain the heap:

# eduction (push-based): processes 10M elements in constant memory
$ clj -J-Xmx256m -M -e '(reduce + 0 (eduction (mapcat range) [10000000]))'
49999995000000

# sequence (pull-based): buffers 10M elements in LinkedList → OOM
$ clj -J-Xmx256m -M -e '(reduce + 0 (sequence (mapcat range) [10000000]))'
Execution error (OutOfMemoryError) at java.util.LinkedList/linkLast (LinkedList.java:154).
Java heap space

Same transducer. Same data. Same reduction. 256 MB is more than enough to sum 10 million numbers — unless you materialize all of them in a LinkedList first.


Inside the TransformerIterator

To understand why pull-based consumption buffers everything, let's look at the Clojure source.

clojure.core/sequence creates a TransformerIterator and wraps it in a chunked lazy seq. The TransformerIterator constructor (source) initializes the transducer chain with a reducing function that adds every output to an internal buffer:

// TransformerIterator constructor (simplified)
this.xf = (IFn) xform.invoke(new AFn() {
    public Object invoke(Object acc, Object o) {
        buffer = buffer.add(o);   // every transformed element goes here
        return acc;
    }
});

The step() method pulls one source element and runs it through the full chain:

private boolean step() {
    while(next == NONE) {
        if(buffer != EMPTY) {
            next = buffer.first();      // return first buffered element
            buffer = buffer.rest();
            return true;
        }
        if(sourceIter.hasNext()) {
            xf.invoke(null, sourceIter.next());  // apply full xf to ONE source element
            // all outputs land in buffer via buffer.add(o)
        } else {
            completed = true;
        }
    }
    return true;
}

For 1-to-1 transducers (map): one input produces one buffer entry. Fine.

For 1-to-N transducers (mapcat): one input produces N buffer entries. All N elements materialized before the first is returned. The buffer is a LinkedList that grows without bound.

The critical point: any pull-based consumption of an eduction containing mapcat triggers this — whether via sequence, .iterator(), seq, or first. They all go through TransformerIterator.

In our pipeline, the source is [file-path] — one element. mapcat rows->maps expands it into 80,000 row maps. All 80,000 maps land in the LinkedList at once. Now imagine each map has 40 fields (~4 KB): that's ~320 MB materialized before the first row is processed.


The Solution: Eliminate mapcat, Embrace IReduceInit

The fix addresses the root cause: remove mapcat from the transducer chain so that pull-based consumption is safe, then use IReduceInit to keep push-based reduction for the single-source path.

Before: [file-path] source + mapcat expansion

;; 1 input → N outputs via mapcat
(eduction
  (comp (with-open-xf open-stream)
        (with-open-xf io/reader)
        (map parse-file)
        (mapcat rows->maps))   ;; ← 1-to-N: dangerous with pull
  [file-path])

After: lines as source, 1-to-1 transducers, IReduceInit + Iterable

(defn open-rows [file-path]
  (let [reader    (open-reader file-path)   ;; resource opened OUTSIDE the xf chain
        file-data (parse-file reader)
        header    (:header file-data)
        xf        (map (partial zipmap (map keyword header)))
        rows      (:rows file-data)]
    (reify
      clojure.lang.IReduceInit
      (reduce [_ f init]
        ;; Push-based: transduce drives the iteration, O(1) memory
        (transduce xf (completing f) init rows))

      Iterable
      (iterator [_]
        ;; Pull-based but 1-to-1: safe for lockstep merging
        (.iterator ^Iterable (eduction xf rows)))

      java.io.Closeable
      (close [_] (.close reader)))))

Three changes, one reify:

  • No more mapcat: the source is now rows (the file's lines), not [file-path]. The transducer xf is purely 1-to-1 (map). Even if a TransformerIterator is created (via the Iterable path), it buffers at most one element.
  • No more with-open-xf: resource management moves from inside the transducer chain to with-open at the call site. The reader is opened before the reify and closed via Closeable.
  • IReduceInit instead of Seqable: for the single-source path, transduce calls .reduce() directly — fully push-based, no lazy sequence, no TransformerIterator. Iterable is there for the merge path, which needs to pull from two sources in lockstep.

Merging with explicit iterators

The merge function walks two Iterable sources via Java iterators in lockstep, returning an IReduceInit:

(defn merge-rows [iterable-a iterable-b]
  (reify clojure.lang.IReduceInit
    (reduce [_ f init]
      (let [it-a (.iterator ^Iterable iterable-a)
            it-b (.iterator ^Iterable iterable-b)]
        (loop [acc init]
          (if (reduced? acc)
            @acc
            (let [has-a (.hasNext it-a)
                  has-b (.hasNext it-b)]
              (if (and (not has-a) (not has-b))
                acc
                (let [val-a (when has-a (.next it-a))
                      val-b (when has-b (.next it-b))]
                  (recur (f acc (merge val-a val-b))))))))))))

No sequence. No TransformerIterator for the merge itself. No lazy sequence. Each pair of rows is pulled from the iterators (safe — 1-to-1 transducers), merged, and forwarded to the reducing function. Once processed, nothing references the row — it becomes eligible for GC immediately.

Resource management at the call site

(defn with-rows [file-path-a file-path-b f]
  (if-not file-path-b
    ;; Single source: IReduceInit passed directly to f
    (with-open [rows (open-rows file-path-a)]
      (f rows))
    ;; Merge: two Iterables merged into IReduceInit
    (with-open [rows-a (open-rows file-path-a)
                rows-b (open-rows file-path-b)]
      (f (merge-rows rows-a rows-b)))))

The callback f receives an IReduceInit. Whether it came from a single source or a merged pair, the caller doesn't need to know — it just calls transduce:

(defn count-rf
  "A reducing function that counts processed items — O(1), retains nothing."
  ([] 0)
  ([result _input] (inc result))
  ([result] result))

(with-rows file-path-a file-path-b
  (fn [rows]
    (transduce
      (comp (partition-all 2000)
            (map process-batch!))
      count-rf
      rows)))

rows is an IReduceInit. transduce calls .reduce() on it, which is push-based all the way down — whether it's a single-source reduction or the merge-rows iterator loop. No lazy sequences anywhere in the pipeline. The callback is completely decoupled from the source topology.


Alternatives Considered

Lazy sequences everywhere

Without mapcat, a fully pull-based approach using Seqable + sequence would also work:

;; Simpler, but fragile
(reify
  clojure.lang.Seqable
  (seq [_] (sequence xf rows))
  java.io.Closeable
  (close [_] (.close reader)))

This is simpler code — one protocol instead of three. And it works, because the transducers are now 1-to-1 so the TransformerIterator buffer holds at most one element.

The tradeoff is head retention. Lazy sequences can be silently held in memory if any binding references the head. If someone writes (let [rows (seq src)] ...) and rows stays in scope while the sequence is consumed, every processed element is retained. With IReduceInit, this is structurally impossible — there is no lazy sequence whose head could be held.

IReduceInit is slightly more complex locally but guarantees O(1) memory globally, regardless of how the caller uses it.

core.async channels

Another approach is to keep both sources push-based by running each in its own thread, pushing to a core.async channel:

;; Each source pushes to a channel via transduce
(let [ch-a (async/chan 32)
      ch-b (async/chan 32)]
  (async/thread (transduce xf (fn [_ row] (async/>!! ch-a row)) nil source-a))
  (async/thread (transduce xf (fn [_ row] (async/>!! ch-b row)) nil source-b))
  ;; Merger reads from both channels in lockstep
  (loop [acc init]
    (let [row-a (async/<!! ch-a)
          row-b (async/<!! ch-b)]
      (recur (f acc (merge row-a row-b))))))

This avoids TransformerIterator entirely — both sources stay push-based. The merge pulls from bounded channels instead of iterators.

But it adds significant complexity:

  • Dependency: core.async must be on the classpath.
  • Threads: two extra threads per ingestion, with their lifecycle to manage.
  • Error propagation: exceptions in a source thread must be caught, forwarded to the merger, and the channels closed. This is non-trivial to get right.
  • Flow control: the bounded channel buffer is just another parameter to tune — too small and you block producers, too large and you buffer memory.

This is overkill. Removing mapcat from the transducer chain solves the root cause without concurrency. The channels approach would make sense if the sources had to use expanding transducers — but they don't.


Results

Performance benchmarks at 80,000 rows with production heap constraints (-Xmx1536m -Xms1536m -XX:+UseG1GC):

Scenario Before After Memory change
Multi-source merge 422 MB delta, 494 rows/s 403 MB delta, 498 rows/s -5%
Single source 139 MB delta, 592 rows/s 95 MB delta, 602 rows/s -32%

Throughput is unchanged. Memory consumption decreased on both paths. The multi-source path that was crashing with OutOfMemoryError now uses only 26% of the available heap.

The key difference is in the complexity class. The old architecture had O(N) memory: doubling the file size doubled the memory footprint. At 200K rows, the buffers alone would exceed 1.2 GB. At 500K rows, even a 4 GB heap wouldn't suffice.

The new architecture has O(1) memory. The same heap handles 80K, 200K, or 500K rows — only the processing time scales linearly.


Takeaways

  1. Merging two sources requires pull-based consumption. You can't drive two reduce calls in lockstep without pulling from at least one source. This forces you through TransformerIterator territory.

  2. TransformerIterator + expanding transducers (mapcat, cat) = O(N) buffering. The buffer materializes all expanded elements in a LinkedList before returning the first. This is by design — it's how the pull-based iterator adapts push-based transducers.

  3. The fix: make transducers 1-to-1. Remove mapcat by changing the source from [file-path] to the lines themselves. With only 1-to-1 transducers, the TransformerIterator buffer holds one element. Pull becomes safe.

  4. IReduceInit + Iterable gives you control over push vs pull per code path. Single-source: IReduceInit for push-based reduction. Multi-source merge: Iterable for safe pull via iterators. One reify, two consumption modes, O(1) memory on both.

  5. eduction and sequence are not interchangeable. They look similar — both take a transducer and a collection. But reduce on an eduction is push-based (calls .reduce()), while reduce on a sequence result is pull-based (calls first/next). The same transducer chain can be O(1) or O(N) depending on which one you use.

  6. Always test with production-sized data under constrained heap. This bug never appeared with 100-row test files or an 8 GB development heap. A simple clj -J-Xmx256m on your test runner would have caught it immediately.

  7. Read the Clojure source. TransformerIterator.java is only ~150 lines. Understanding step() and the buffer mechanism demystifies sequence entirely.


Tested with Clojure 1.11 on OpenJDK 21, G1GC, 1.5 GB heap. All code examples are simplified for clarity — the production codebase uses additional transducers for field normalization, encryption, and database batching, but the core architecture is as described.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment