Skip to content

Instantly share code, notes, and snippets.

@NicolasLambert
Last active March 4, 2026 22:49
Show Gist options
  • Select an option

  • Save NicolasLambert/c3e51cb0b5314f2110161f85be24b4c7 to your computer and use it in GitHub Desktop.

Select an option

Save NicolasLambert/c3e51cb0b5314f2110161f85be24b4c7 to your computer and use it in GitHub Desktop.

Elegant Resource-Safe Transducer Pipelines in Clojure

How to stream large files with O(1) memory, composable transducers, and automatic resource cleanup


The Problem

You need to process large files from cloud storage. Each file has a header line followed by tens of thousands of data rows. The rows must be parsed, transformed, and written to a database in batches.

The constraints are strict:

  • O(1) memory: the file may have 80,000 or 500,000 rows — you can't load it into memory.
  • Resource safety: streams must be closed when processing completes, even on exceptions.
  • Composability: the pipeline should be built from small, reusable pieces.

Transducers are the natural tool for this in Clojure. But there's a subtlety: the stream must be opened as part of the pipeline, not before it. You don't want the caller to manage I/O resources — you want the transducer chain to be self-contained.


The with-open-xf Transducer

The key building block is a transducer that manages the lifecycle of an I/O resource. It opens the resource when an element arrives, passes it downstream, and closes it when the reduction completes:

(defn with-open-xf
  "Transducer that opens a resource with open-fn, passes it downstream,
   and closes it on completion or exception."
  [open-fn]
  (fn [rf]
    (let [resource (volatile! nil)]
      (fn
        ([] (rf))
        ([result]
         (when-let [r @resource]
           (.close ^java.io.Closeable r))
         (rf result))
        ([result input]
         (let [r (open-fn input)]
           (vreset! resource r)
           (rf result r)))))))

This is a regular Clojure transducer — it composes with comp, works with transduce, eduction, and sequence. But it does something unusual: it manages a Closeable resource inside the transducer chain.

The caller doesn't need to call with-open. The transducer opens the resource when it receives an input element, and closes it when the reducing function signals completion (the 1-arity call). This works because transduce guarantees completion semantics — the 1-arity rf is always called, even if the reduction is short-circuited via reduced.


Composing the Full Pipeline

With with-open-xf in hand, we can build a pipeline that goes from a file path to parsed row maps — in a single comp:

(defn parse-file [reader]
  ;; Reads the header line, returns {:header ["col1" "col2" ...], :rows <lazy-line-seq>}
  (let [lines (line-seq reader)
        header (clojure.string/split (first lines) #";")]
    {:header header
     :rows   (rest lines)}))

(defn rows->maps [{:keys [header rows]}]
  ;; Expands file-data into a seq of row maps
  (map (fn [line]
         (zipmap (map keyword header)
                 (clojure.string/split line #";")))
       rows))

(defn open-rows
  "Returns an eduction: file-path → stream of parsed row maps."
  [file-path]
  (eduction
    (comp (with-open-xf cloud-storage/open-stream)  ;; path → InputStream
          (with-open-xf io/reader)                   ;; InputStream → BufferedReader
          (map parse-file)                           ;; Reader → {:header .. :rows ..}
          (mapcat rows->maps))                       ;; file-data → N row maps
    [file-path]))  ;; source: a vector with ONE element

Let's unpack what happens when this eduction is consumed:

  1. The source is [file-path] — a vector containing a single element.
  2. with-open-xf cloud-storage/open-stream receives the file path, opens a cloud storage stream, and passes the InputStream downstream.
  3. with-open-xf io/reader receives the InputStream, wraps it in a BufferedReader, and passes it downstream.
  4. map parse-file reads the header, returns {:header [...] :rows <lazy-lines>}.
  5. mapcat rows->maps expands that single file-data map into N row maps — one per line.

The mapcat is structurally required here. The source has one element (the file path). The with-open-xf transducers need that single element to open the streams. The expansion from 1 file to N rows must happen inside the chain — and mapcat is how you go from 1 to N in a transducer pipeline.


Consuming the Pipeline

The handler consumes the eduction with transduce:

;; A reducing function that counts processed items — retains no row data.
(defn count-rf
  ([] 0)
  ([result _input] (inc result))
  ([result] result))

(defn ingest! [file-path]
  (transduce
    (comp (partition-all 2000)
          (map process-batch!))
    count-rf
    (open-rows file-path)))

transduce calls .reduce() on the Eduction, which is push-based: the source vector iterates over its single element and pushes it through the transducer chain. When mapcat expands the file-data into 80,000 rows, each row is forwarded immediately to the downstream reducing function. No intermediate buffering. The row is parsed, processed, and released before the next one is produced.

The reducing function count-rf only holds an integer counter. At any point during ingestion, there is exactly one row map in flight. Memory usage is constant regardless of file size.


Why This Works

This architecture has several qualities that make it worth understanding:

Pure composition. Every step — opening a stream, wrapping in a reader, parsing, expanding rows — is a transducer. They compose with comp just like any other transducer. You can add a filter, a take, or a map anywhere in the chain without changing the rest.

Resource safety. The with-open-xf transducers guarantee that streams are closed when the reduction completes. No try/finally at the call site. No risk of forgetting to close a stream. The transducer chain is self-contained.

O(1) memory. Because transduce is push-based, mapcat forwards each expanded element immediately. There is no buffering — the LinkedList buffer that TransformerIterator uses in pull-based consumption is never created. One row in, one row processed, one row released.

Idiomatic Clojure. This is eduction, transduce, comp, map, mapcat — transducers the way they were designed to be used. No custom protocols, no Java interop, no mutable state (beyond with-open-xf's volatile for the resource reference).

The push-based mechanics

Understanding why this is O(1) requires knowing what transduce does with an Eduction:

transduce on Eduction
  └→ calls .reduce(f, init) on the Eduction
       └→ internally calls (transduce xf f init [file-path])
            └→ the vector [file-path] iterates: pushes file-path into xf chain
                 └→ with-open-xf opens stream, pushes InputStream
                      └→ with-open-xf opens reader, pushes BufferedReader
                           └→ map parse-file, pushes {:header :rows}
                                └→ mapcat calls reduce on rows->maps result
                                     └→ for EACH row: pushes row-map to rf
                                          └→ rf processes and discards

The entire chain is driven by the source. Each row flows through the full pipeline before the next one starts. This is push-based reduction — the source pushes, the transducers transform, the reducing function consumes.


What About Two Sources?

So far, this architecture handles a single file beautifully. But what if the requirements change — and you need to merge two files row-by-row before processing?

That's exactly what happened to us. The solution looked obvious, worked in tests, and blew up in production with an OutOfMemoryError.

In the next article, we explore that problem — and what it reveals about the difference between push-based and pull-based transducer consumption in Clojure.


Tested with Clojure 1.11 on OpenJDK 21. All code examples are simplified for clarity.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment