Created
February 20, 2019 19:43
-
-
Save dobesv/e637893adb0588a768db70e2c2e7ba29 to your computer and use it in GitHub Desktop.
Async iterator wrapper for papaparse
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import Papa, { ParseConfig, Parser } from 'papaparse'; | |
export type CsvAsyncIteratorOptions = Exclude< | |
ParseConfig, | |
'step' | 'chunk' | 'complete' | 'error' | |
>; | |
/** | |
* Helper to allow async iteration over the contents of | |
* a CSV input. | |
* | |
* This allows the caller to walk through the results | |
* using a `for await` loop and deal with rows asyncronously | |
* one at a time. | |
* | |
* When process a large input this reduces the memory | |
* use of the process because we don't have to store | |
* the parsed representation of all the rows in memory | |
* at once; just the last returned row is represented | |
* as JavaScript objects. | |
* | |
* Also, if the input is coming from a file or URL this | |
* will avoid loading the entire file into memory. | |
* | |
* The combination of these two factors allows processing | |
* of extremely large files without using tons of memory. | |
*/ | |
const csvAsyncIterator = <RowType = any>( | |
input: Buffer | string | NodeJS.ReadableStream, | |
config?: CsvAsyncIteratorOptions, | |
): AsyncIterable<RowType[]> => { | |
return { | |
[Symbol.asyncIterator]: () => { | |
let parser: Parser | null = null; | |
let done: boolean = false; | |
let error: any = null; | |
let resolve: (r: IteratorResult<RowType[]>) => void = () => undefined; | |
let reject: (err: any) => void = () => undefined; | |
return { | |
return: () => { | |
done = true; | |
if (parser) parser.abort(); | |
return Promise.resolve({ | |
done, | |
value: (undefined as any) as RowType[], | |
}); | |
}, | |
throw: err => { | |
done = true; | |
error = err; | |
if (parser) parser.abort(); | |
return Promise.reject(err); | |
}, | |
next: () => | |
error | |
? Promise.reject(error) | |
: done | |
? Promise.resolve({ | |
done: true, | |
value: (undefined as any) as RowType[], | |
}) | |
: new Promise<IteratorResult<RowType[]>>((res, rej) => { | |
try { | |
resolve = res; | |
reject = rej; | |
if (parser === null) { | |
Papa.parse( | |
(Buffer.isBuffer(input) | |
? input.toString('utf8').replace('\r', '') | |
: input) as any, | |
{ | |
...config, | |
error: (ioError, f) => { | |
done = true; | |
error = ioError; | |
return reject(ioError); | |
}, | |
step: (results, p) => { | |
parser = p; | |
parser.pause(); | |
if (results.errors && results.errors.length) { | |
done = true; | |
return reject(results.errors[0]); | |
} else { | |
return resolve({ | |
value: (results.data as any) as RowType[], | |
done: false, | |
}); | |
} | |
}, | |
complete: () => { | |
done = true; | |
if (error) return reject(error); | |
else | |
return resolve({ | |
done: true, | |
value: (undefined as any) as RowType[], | |
}); | |
}, | |
}, | |
); | |
} else { | |
parser.resume(); | |
} | |
} catch (err) { | |
error = err; | |
return reject(err); | |
} | |
}), | |
}; | |
}, | |
}; | |
}; | |
export default csvAsyncIterator; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
export default <T, U>( | |
sourceIterator: AsyncIterator<T>, | |
transform: (payload: T) => U | Promise<U>, | |
): AsyncIterator<U> & AsyncIterable<U> => { | |
const applyTransformToResult = async ( | |
nextResult: IteratorResult<T>, | |
): Promise<IteratorResult<U>> => { | |
return { | |
done: nextResult.done, | |
value: nextResult.done ? undefined as any : await transform(nextResult.value), | |
}; | |
}; | |
return { | |
next(): Promise<IteratorResult<U>> { | |
return sourceIterator.next().then(applyTransformToResult); | |
}, | |
return(arg?: T): Promise<IteratorResult<U>> { | |
if (sourceIterator.return) | |
return sourceIterator.return(arg).then(applyTransformToResult); | |
return Promise.resolve({ | |
done: true, | |
value: arg as any, | |
}); | |
}, | |
throw(e?: any): Promise<IteratorResult<U>> { | |
if (sourceIterator.throw) | |
return sourceIterator.throw(e).then(applyTransformToResult); | |
return Promise.reject(e); | |
}, | |
[Symbol.asyncIterator]() { | |
return this; | |
}, | |
}; | |
}; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment