Skip to content

Instantly share code, notes, and snippets.

@statico
Created March 8, 2019 23:08
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save statico/e2b0d2340be85c158aa079d09769c0ea to your computer and use it in GitHub Desktop.
Save statico/e2b0d2340be85c158aa079d09769c0ea to your computer and use it in GitHub Desktop.
Streaming CSV Writer / TypeScript
import * as stringify from 'csv-stringify/lib/sync'
import * as fs from 'fs-extra'
import { Writable } from 'stream'
import * as tempy from 'tempy'
// Stringifies and writes a row at a time to disk. Call close() when finished to
// get the path to a temporary file where the CSV was written. You are
// responsible for deleting that file when finished.
export class StreamingCSVWriter {
private count: number
private readonly columns: string[]
private readonly columnsToIndex: Map<string, number>
private path: string
private stream: Writable
constructor(columns: string[]) {
this.columns = columns
this.columnsToIndex = (() => {
const ret = new Map<string, number>()
columns.forEach((name, index) => {
ret.set(name, index)
})
return ret
})()
this.path = tempy.file({ extension: 'csv' })
this.count = 0
this.stream = fs.createWriteStream(this.path, { flags: 'a' })
this.stream.write(stringify([this.columns]))
}
// Write a row to the CSV. Can be an array of values for a single row (`['foo', 123]`) or map
// of column names to values (`{ foo: "bar", baz: 123 }`).
public write(values: any): void {
if (Array.isArray(values)) {
this.stream.write(stringify([values]))
} else {
const row = [] // Using "new Array(columns.length)" would create unnecessary commas
for (const name of Object.keys(values)) {
if (this.columnsToIndex.has(name)) {
const index = this.columnsToIndex.get(name)
row[index] = values[name]
} else {
throw new Error(`Unknown column: '${name}' (not in: ${JSON.stringify(this.columns)})`)
}
}
this.stream.write(stringify([row]))
}
this.count++
}
// Gets the number of rows written.
public getCount(): number {
return this.count
}
// Call to close the stream and flush all pending writes.
public async close(): Promise<string> {
if (!this.isOpen()) {
throw new Error('Writer has not been opened')
}
return new Promise<string>((resolve, reject) => {
this.stream.end(() => {
resolve(this.path)
})
})
}
public isOpen(): boolean {
return this.stream != null
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment