Skip to content

Instantly share code, notes, and snippets.

@abdulloooh
Last active April 22, 2023 23:31
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 2 You must be signed in to fork a gist
  • Save abdulloooh/d4c881368971b40c35aded364c45513d to your computer and use it in GitHub Desktop.
Save abdulloooh/d4c881368971b40c35aded364c45513d to your computer and use it in GitHub Desktop.
Advanced Node.js Streams
// BUFFER
// Load the whole content into a buffer/ into memory once before wrting it out to user
const http = require("http");
const media = "./testvid.mp4";
const fs = require("fs");
http
.createServer((req, res) => {
fs.readFile(media, (err, data) => {
if (err) console.log({ err });
res.writeHeader(200, { "Content-Type": "video/mp4" });
res.end(data);
});
})
.listen(3000, () => {
console.log("buffer - port 3000");
});
// STREAM
// read the content chunk by chunk
const http = require("http");
const fs = require("fs");
const media = "./testvid.mp4";
http
.createServer((req, res) => {
res.writeHeader(200, { "Content-Type": "video/mp4" });
fs.createReadStream(media).pipe(res).on("error", console.log);
})
.listen(3000, () => console.log("stream - port 3000"));
// CREATING readable stream
const { stdout } = require("process");
const { Readable } = require("stream");
// convert peak into stream
const peaks = ["adee", "iee", "yakudh", "sulop", "potilo", "wertu"];
class StreamFromArray extends Readable {
constructor(array) {
//Readable may (not) take argument super(), super({encoding:"utf-8"}), super({objectMode:true}) etc
// super({ encoding: "utf-8" });
super({ objectMode: true });
this.array = array;
this.index = 0;
}
_read() {
if (this.index < this.array.length) {
this.push(this.array[this.index]);
// object mode handles if object is being pushed
// this.push({ index: this.index, chunk: this.array[this.index] });
this.index++;
} else this.push(null);
}
}
const peakStrem = new StreamFromArray(peaks);
/**
* _Events_
* data, error, close, end, pause, readable, resume
*/
peakStrem.on("data", (chunk) => console.log("\nchunk ", chunk, "\n"));
// can check size by reading length of buffer, not sure of this
peakStream.on("data",(chunk)=>console.log(chunk.length))
//
peakStrem.on("end", () => console.log("DONE!"));
peakStrem.pipe(stdout); //cos stdout is writable
// USING exisiting READABLE STREAMS
const fs = require("fs");
const readStream = fs.createReadStream(
"./Ex_Files_Advanced_NodeJS/Exercise Files/Ch02/powder-day.mp4"
);
// FLOWING MODE
readStream.on("data", console.log);
readStream.on("end", () => console.log("Done!"));
readStream.on("error", console.log);
// NON-FLOWING MODE
readStream.pause();
// stdin is a readable stream, can read from its client
// process.stdin.on("data", (chunk) => {
// console.log("echo: ", chunk.toString());
// });
// use stdin to control readStream
process.stdin.on("data", (chunk) => {
if (chunk.toString().trim() !== "finish") return readStream.read();
// RETURN TO FLOWING MODE
readStream.resume();
});
/**
* Writable streams can be used to catch data from a readable stream and send it out
* They are ALSO everywhere; fs,stdout, npm modules etc
*/
const { createReadStream, createWriteStream } = require("fs");
const readStream = createReadStream(
"./Ex_Files_Advanced_NodeJS/Exercise Files/Ch02/powder-day.mp4"
);
const writeStream = createWriteStream("./copy1.mp4");
readStream.on("data", (chunk) => {
writeStream.write(chunk);
});
readStream.on("error", console.log);
readStream.on("end", () => {
writeStream.end();
});
writeStream.on("close", () => {
process.stdout.write("File copied\n"); //stdout is also a writeStream
});
writeStream.on("error", console.log);
/**
* DIFFERENCE BETWEEN stream data and pipe
* Roughly same, Pipe and Unpipe and instances of Event-emitter
BUT from https://stackoverflow.com/questions/53774176/difference-between-pipe-and-stream-in-node-js
You should use the pipe method because the flow of data will be automatically managed so that
the destination Writable stream is not overwhelmed by a faster Readable stream.
If your readable stream is faster than the writable stream then you may experience data loss in dest.write(data) method
so better you should use src.pipe(des);
*/
/**
* Handling Backpressure
*
* 2 ways, not essentially mutually exclusive
*
* 1. Pause and Drain
* 2. Set big highWaterMark
*
* highWaterMark is like using a massive hose in piping water from one bucket to another illustration,
* the bigger the hose, the lesser the backpressure. A usual hose might however causes backpressure
* if water is poured very fast from src, so need for pausing the pour and draining the hose
*
* but the larget the highWaterMark, the more memory to consume
*/
const { createReadStream, createWriteStream } = require("fs");
const readStream = createReadStream(
"./Ex_Files_Advanced_NodeJS/Exercise Files/Ch02/powder-day.mp4"
);
// const writeStream = createWriteStream("./copy1.mp4");
const writeStream = createWriteStream("./copy1.mp4", {
highWaterMark: 12982818,
});
readStream.on("data", (chunk) => {
const canWriteMore = writeStream.write(chunk);
if (!canWriteMore) {
console.log("backpressure");
readStream.pause();
}
});
readStream.on("error", console.log);
readStream.on("end", () => {
writeStream.end();
});
writeStream.on("drain", () => {
console.log("drained");
readStream.resume();
});
writeStream.on("close", () => {
process.stdout.write("File copied\n"); //stdout is also a writeStream
});
writeStream.on("error", console.log);
/**
* With pipe, it handles all the backpressure, draining and resuming etc with lesser code
* pipe function can pass data from any readableStream to any writableStream
*/
const { createReadStream, createWriteStream } = require("fs");
const readStream = createReadStream(
"./Ex_Files_Advanced_NodeJS/Exercise Files/Ch02/powder-day.mp4"
);
// const writeStream = createWriteStream("./copy1.mp4");
const writeStream = createWriteStream("./copy1.mp4");
readStream
.pipe(writeStream)
.on("close", () => console.log("File Copied!"))
.on("error", console.error);
// let's write text
const textStream = createWriteStream("./file1.txt");
process.stdin.pipe(textStream);
/**
* We can write into textStream from console in several ways
*
* 1. Allow readStream to open and enter your data, then it is auto piped into textStream
* 2. echo on console and use `unix` pipe into our node process e.g echo "hello world" | node .
* 3. Read from a file on console and unix pipe it into node program e.g cat ../sample.txt | node .
*
* Basically, data is passed into node program from all the 3 above, get into the readable stdin
* then piped into writable textStream
*/
/**
* DUPLEX STREAM
*
* readStream can pipe into it, and it can pipe into writeStream
* It is a middle section.
* Think of it as a middleware that can be utilized to do some in-processing e.g
* - Report on data being sent, like total size moved
* - Throttle data passage i.e slow down
*/
const { Duplex, PassThrough } = require("stream");
const { createReadStream, createWriteStream } = require("fs");
const readStream = createReadStream(
"./Ex_Files_Advanced_NodeJS/Exercise Files/Ch02/powder-day.mp4"
);
const writeStream = createWriteStream("./copy1.mp4");
class Throttle extends Duplex {
constructor(delay_ms) {
super();
this.delay = delay_ms;
}
_write(chunk, encoding, callback) {
this.push(chunk);
setTimeout(callback, this.delay);
}
_read(chunk) {}
_final() {
this.push(null);
}
}
const report = new PassThrough();
const throttle = new Throttle(100);
readStream.pipe(throttle).pipe(report).pipe(writeStream);
let totalSize = 0;
report.on("data", (chunk) => {
totalSize += chunk.length;
console.log(`_Total moved_: ${totalSize} bytes`);
});
/**
* TRANSFORM STREAM
*
* Transform Stream is a form of Duplex stream likewise that sits as middle line
* Basically used to transform data from readStream before passing to writeStream e.g
* replace some characters
* encrypt/decrypt
* compress etc
*/
const { stdin, stdout } = require("process");
const { Transform } = require("stream");
class ReplaceText extends Transform {
constructor(char) {
super();
this.replaceChar = char;
}
_transform(chunk, encoding, callback) {
this.push(chunk.toString().replace(/\w/g, this.replaceChar));
callback();
}
_flush(callback) {
/**Havent figuredout the use of this */
}
}
const xTreme = new ReplaceText("x");
stdin.pipe(xTreme).pipe(stdout);
console.log("start streaming data in i.e start typing");
/**
Crypto is an NPM package that has transform streams that can encrypt data chunk by chunk,
and then pass encrypted data to the write stream.
Or decrypt data from a read stream,
and pass the decrypted data to the write stream.
So transform streams are an essential part of the stream family.
*/
const { createServer } = require("http");
const { createReadStream, stat } = require("fs");
const { promisify } = require("util");
const fileInfo = promisify(stat);
const file = "./Ex_Files_Advanced_NodeJS/Exercise Files/Ch03/powder-day.mp4";
createServer(async (req, res) => {
const { size } = await fileInfo(file);
const range = req.headers.range;
if (range) {
console.log(range);
let [start, end] = range.replace(/bytes=/, "").split("-");
start = parseInt(start, 10);
end = end ? parseInt(end, 10) : size - 1;
res.writeHead(206, {
"Content-Range": `bytes ${start}-${end}/${size}`,
"Accept-Ranges": "bytes",
"Content-Length": end + 1 - start,
"Content-Type": "video/mp4",
});
createReadStream(file, { start, end }).pipe(res);
} else {
res.writeHead(200, {
"Content-Length": size,
"Content-Type": "video/mp4",
});
createReadStream(file).pipe(res);
}
}).listen(3000, () => console.log("server - port 3000"));
/**
* implementing `range request` is very important as some broswers e.g
* safari might not even load the video if range requests are not handled.
*
* That aside, client can skip the video forward or backward
*
* 206 status code implies partial response content to browser
*
* The Accept-Ranges response HTTP header is a marker used by the server to
* advertise its support of partial requests. The value of this field
* indicates the unit that can be used to define a range.
*/
const { createServer } = require("http");
const { createReadStream, stat, createWriteStream } = require("fs");
const { promisify, inspect } = require("util");
const multiparty = require("multiparty");
const fileInfo = promisify(stat);
const file = "./Ex_Files_Advanced_NodeJS/Exercise Files/Ch03/powder-day.mp4";
const responseWithVideo = async (req, res) => {
const { size } = await fileInfo(file);
const range = req.headers.range;
if (range) {
console.log(range);
let [start, end] = range.replace(/bytes=/, "").split("-");
start = parseInt(start, 10);
end = end ? parseInt(end, 10) : size - 1;
res.writeHead(206, {
"Content-Range": `bytes ${start}-${end}/${size}`,
"Accept-Ranges": "bytes",
"Content-Length": end + 1 - start,
"Content-Type": "video/mp4",
});
createReadStream(file, { start, end }).pipe(res);
} else {
res.writeHead(200, {
"Content-Length": size,
"Content-Type": "video/mp4",
});
createReadStream(file).pipe(res);
}
};
createServer((req, res) => {
if (req.method === "POST" && req.url === "/upload") {
// CAN JUST PARSE DIRECTLY
// req.pipe(res);
// req.pipe(process.stdout);
// req.pipe(createWriteStream("./upload.file"));
// CAN VALIDATE FIRST
const form = new multiparty.Form();
// form.parse(req, function (err, fields, files) {
// res.writeHead(200, { "content-type": "text/plain" });
// res.write("received upload:\n\n");
// res.end(inspect({ fields: fields, files: JSON.stringify(files) }));
// });
form
.on("part", (part) => {
//for a single file
part.pipe(createWriteStream(`./store/single/${part.filename}`));
// for mix of fields and files
{
// for fields other than files
if (!part.filename) {
console.log({ field: part.name });
part.resume();
} else {
// if single file is expected or multiple expected to be saved
part.pipe(createWriteStream(`./store/${part.filename}`));
//if multiple files are expected but particular one is needed
if (part.filename.includes("sample"))
part.pipe(createWriteStream(`./store/filter/${part.filename}`));
}
}
})
.on("close", function () {
console.log("Upload completed!");
res.writeHead(200, { "content-type": "text/plain" });
res.end("Received files");
});
form.parse(req);
} else if (req.url === "/video") responseWithVideo(req, res);
else {
res.writeHead(200, { "Content-Type": "text/html" });
res.end(`
<form action="/upload" enctype="multipart/form-data" method="post">
<input type="text" name="title"><br>
<input type="text" name="author"><br>
<input type="file" name="upload" multiple="multiple"><br>
<input type="submit" value="Upload">
</form>
`);
}
}).listen(3000, () => console.log("server - port 3000"));
/**
* implementing `range request` is very important as some broswers e.g
* safari might not even load the video if range requests are not handled.
*
* That aside, client can skip the video forward or backward
*
* 206 status code implies partial response content to browser
*
* The Accept-Ranges response HTTP header is a marker used by the server to
* advertise its support of partial requests. The value of this field
* indicates the unit that can be used to define a range.
*
* multiparty library can parse all form of requests, fields and files
*/
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment