Skip to content

Instantly share code, notes, and snippets.

@Losses
Created November 16, 2022 06:13
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Losses/3d28159c6fe05ec25c79a487cff12413 to your computer and use it in GitHub Desktop.
Save Losses/3d28159c6fe05ec25c79a487cff12413 to your computer and use it in GitHub Desktop.
Stream loading JSON and Stream saving JSON w/ `stream-json`
import fs from "fs";
import { Readable } from "stream";
import { stat, move, remove } from "fs-extra";
import Assembler from "stream-json/Assembler";
import { chain } from "stream-chain";
import { parser } from "stream-json";
import { stringer } from "stream-json/Stringer";
import { disassembler } from "stream-json/Disassembler";
export const loadJSON = async (path: string) => {
const stats = await stat(path);
if (stats.isDirectory()) {
throw new TypeError(`database is a directory, this is not allowed`);
}
if (!stats.isFile()) {
throw new TypeError(`File not exists`);
}
const pipeline = chain([fs.createReadStream(path), parser()]);
const assembler = Assembler.connectTo(pipeline);
return new Promise((resolve, reject) => {
assembler.on("done", (asm) => resolve(asm.current));
assembler.on("error", (error) => reject(error));
});
};
export const saveJSON = async (x: object, path: string) => {
const tmpPath = `${path}~`;
await remove(tmpPath);
const writeStream = fs.createWriteStream(tmpPath);
const source = new Readable({
objectMode: true,
});
source.push(x);
source.push(null);
const nextPipeline = chain(
[source, disassembler(), stringer(), writeStream],
{}
);
return new Promise((resolve, reject) => {
writeStream.on("close", async () => {
await remove(path);
await move(tmpPath, path);
await remove(tmpPath);
resolve(null);
});
writeStream.on("error", (error) => reject(error));
nextPipeline.on("error", (error) => reject(error));
});
};
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment