Skip to content

Instantly share code, notes, and snippets.

@dobesv
Created February 20, 2019 19:43
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save dobesv/e637893adb0588a768db70e2c2e7ba29 to your computer and use it in GitHub Desktop.
Save dobesv/e637893adb0588a768db70e2c2e7ba29 to your computer and use it in GitHub Desktop.
Async iterator wrapper for papaparse
import Papa, { ParseConfig, Parser } from 'papaparse';
export type CsvAsyncIteratorOptions = Exclude<
ParseConfig,
'step' | 'chunk' | 'complete' | 'error'
>;
/**
* Helper to allow async iteration over the contents of
* a CSV input.
*
* This allows the caller to walk through the results
* using a `for await` loop and deal with rows asyncronously
* one at a time.
*
* When process a large input this reduces the memory
* use of the process because we don't have to store
* the parsed representation of all the rows in memory
* at once; just the last returned row is represented
* as JavaScript objects.
*
* Also, if the input is coming from a file or URL this
* will avoid loading the entire file into memory.
*
* The combination of these two factors allows processing
* of extremely large files without using tons of memory.
*/
const csvAsyncIterator = <RowType = any>(
input: Buffer | string | NodeJS.ReadableStream,
config?: CsvAsyncIteratorOptions,
): AsyncIterable<RowType[]> => {
return {
[Symbol.asyncIterator]: () => {
let parser: Parser | null = null;
let done: boolean = false;
let error: any = null;
let resolve: (r: IteratorResult<RowType[]>) => void = () => undefined;
let reject: (err: any) => void = () => undefined;
return {
return: () => {
done = true;
if (parser) parser.abort();
return Promise.resolve({
done,
value: (undefined as any) as RowType[],
});
},
throw: err => {
done = true;
error = err;
if (parser) parser.abort();
return Promise.reject(err);
},
next: () =>
error
? Promise.reject(error)
: done
? Promise.resolve({
done: true,
value: (undefined as any) as RowType[],
})
: new Promise<IteratorResult<RowType[]>>((res, rej) => {
try {
resolve = res;
reject = rej;
if (parser === null) {
Papa.parse(
(Buffer.isBuffer(input)
? input.toString('utf8').replace('\r', '')
: input) as any,
{
...config,
error: (ioError, f) => {
done = true;
error = ioError;
return reject(ioError);
},
step: (results, p) => {
parser = p;
parser.pause();
if (results.errors && results.errors.length) {
done = true;
return reject(results.errors[0]);
} else {
return resolve({
value: (results.data as any) as RowType[],
done: false,
});
}
},
complete: () => {
done = true;
if (error) return reject(error);
else
return resolve({
done: true,
value: (undefined as any) as RowType[],
});
},
},
);
} else {
parser.resume();
}
} catch (err) {
error = err;
return reject(err);
}
}),
};
},
};
};
export default csvAsyncIterator;
export default <T, U>(
sourceIterator: AsyncIterator<T>,
transform: (payload: T) => U | Promise<U>,
): AsyncIterator<U> & AsyncIterable<U> => {
const applyTransformToResult = async (
nextResult: IteratorResult<T>,
): Promise<IteratorResult<U>> => {
return {
done: nextResult.done,
value: nextResult.done ? undefined as any : await transform(nextResult.value),
};
};
return {
next(): Promise<IteratorResult<U>> {
return sourceIterator.next().then(applyTransformToResult);
},
return(arg?: T): Promise<IteratorResult<U>> {
if (sourceIterator.return)
return sourceIterator.return(arg).then(applyTransformToResult);
return Promise.resolve({
done: true,
value: arg as any,
});
},
throw(e?: any): Promise<IteratorResult<U>> {
if (sourceIterator.throw)
return sourceIterator.throw(e).then(applyTransformToResult);
return Promise.reject(e);
},
[Symbol.asyncIterator]() {
return this;
},
};
};
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment