How to stream large files with O(1) memory, composable transducers, and automatic resource cleanup
You need to process large files from cloud storage. Each file has a header line followed by tens of thousands of data rows. The rows must be parsed, transformed, and written to a database in batches.
The constraints are strict:
- O(1) memory: the file may have 80,000 or 500,000 rows — you can't load it into memory.
- Resource safety: streams must be closed when processing completes, even on exceptions.
- Composability: the pipeline should be built from small, reusable pieces.
Transducers are the natural tool for this in Clojure. But there's a subtlety: the stream must be opened as part of the pipeline, not before it. You don't want the caller to manage I/O resources — you want the transducer chain to be self-contained.
The key building block is a transducer that manages the lifecycle of an I/O resource. It opens the resource when an element arrives, passes it downstream, and closes it when the reduction completes:
(defn with-open-xf
"Transducer that opens a resource with open-fn, passes it downstream,
and closes it on completion or exception."
[open-fn]
(fn [rf]
(let [resource (volatile! nil)]
(fn
([] (rf))
([result]
(when-let [r @resource]
(.close ^java.io.Closeable r))
(rf result))
([result input]
(let [r (open-fn input)]
(vreset! resource r)
(rf result r)))))))This is a regular Clojure transducer — it composes with comp, works with transduce, eduction, and sequence. But it does something unusual: it manages a Closeable resource inside the transducer chain.
The caller doesn't need to call with-open. The transducer opens the resource when it receives an input element, and closes it when the reducing function signals completion (the 1-arity call). This works because transduce guarantees completion semantics — the 1-arity rf is always called, even if the reduction is short-circuited via reduced.
With with-open-xf in hand, we can build a pipeline that goes from a file path to parsed row maps — in a single comp:
(defn parse-file [reader]
;; Reads the header line, returns {:header ["col1" "col2" ...], :rows <lazy-line-seq>}
(let [lines (line-seq reader)
header (clojure.string/split (first lines) #";")]
{:header header
:rows (rest lines)}))
(defn rows->maps [{:keys [header rows]}]
;; Expands file-data into a seq of row maps
(map (fn [line]
(zipmap (map keyword header)
(clojure.string/split line #";")))
rows))
(defn open-rows
"Returns an eduction: file-path → stream of parsed row maps."
[file-path]
(eduction
(comp (with-open-xf cloud-storage/open-stream) ;; path → InputStream
(with-open-xf io/reader) ;; InputStream → BufferedReader
(map parse-file) ;; Reader → {:header .. :rows ..}
(mapcat rows->maps)) ;; file-data → N row maps
[file-path])) ;; source: a vector with ONE elementLet's unpack what happens when this eduction is consumed:
- The source is
[file-path]— a vector containing a single element. with-open-xf cloud-storage/open-streamreceives the file path, opens a cloud storage stream, and passes theInputStreamdownstream.with-open-xf io/readerreceives theInputStream, wraps it in aBufferedReader, and passes it downstream.map parse-filereads the header, returns{:header [...] :rows <lazy-lines>}.mapcat rows->mapsexpands that single file-data map into N row maps — one per line.
The mapcat is structurally required here. The source has one element (the file path). The with-open-xf transducers need that single element to open the streams. The expansion from 1 file to N rows must happen inside the chain — and mapcat is how you go from 1 to N in a transducer pipeline.
The handler consumes the eduction with transduce:
;; A reducing function that counts processed items — retains no row data.
(defn count-rf
([] 0)
([result _input] (inc result))
([result] result))
(defn ingest! [file-path]
(transduce
(comp (partition-all 2000)
(map process-batch!))
count-rf
(open-rows file-path)))transduce calls .reduce() on the Eduction, which is push-based: the source vector iterates over its single element and pushes it through the transducer chain. When mapcat expands the file-data into 80,000 rows, each row is forwarded immediately to the downstream reducing function. No intermediate buffering. The row is parsed, processed, and released before the next one is produced.
The reducing function count-rf only holds an integer counter. At any point during ingestion, there is exactly one row map in flight. Memory usage is constant regardless of file size.
This architecture has several qualities that make it worth understanding:
Pure composition. Every step — opening a stream, wrapping in a reader, parsing, expanding rows — is a transducer. They compose with comp just like any other transducer. You can add a filter, a take, or a map anywhere in the chain without changing the rest.
Resource safety. The with-open-xf transducers guarantee that streams are closed when the reduction completes. No try/finally at the call site. No risk of forgetting to close a stream. The transducer chain is self-contained.
O(1) memory. Because transduce is push-based, mapcat forwards each expanded element immediately. There is no buffering — the LinkedList buffer that TransformerIterator uses in pull-based consumption is never created. One row in, one row processed, one row released.
Idiomatic Clojure. This is eduction, transduce, comp, map, mapcat — transducers the way they were designed to be used. No custom protocols, no Java interop, no mutable state (beyond with-open-xf's volatile for the resource reference).
Understanding why this is O(1) requires knowing what transduce does with an Eduction:
transduce on Eduction
└→ calls .reduce(f, init) on the Eduction
└→ internally calls (transduce xf f init [file-path])
└→ the vector [file-path] iterates: pushes file-path into xf chain
└→ with-open-xf opens stream, pushes InputStream
└→ with-open-xf opens reader, pushes BufferedReader
└→ map parse-file, pushes {:header :rows}
└→ mapcat calls reduce on rows->maps result
└→ for EACH row: pushes row-map to rf
└→ rf processes and discards
The entire chain is driven by the source. Each row flows through the full pipeline before the next one starts. This is push-based reduction — the source pushes, the transducers transform, the reducing function consumes.
So far, this architecture handles a single file beautifully. But what if the requirements change — and you need to merge two files row-by-row before processing?
That's exactly what happened to us. The solution looked obvious, worked in tests, and blew up in production with an OutOfMemoryError.
In the next article, we explore that problem — and what it reveals about the difference between push-based and pull-based transducer consumption in Clojure.
Tested with Clojure 1.11 on OpenJDK 21. All code examples are simplified for clarity.