Skip to content

Instantly share code, notes, and snippets.

@maxymania
Last active December 11, 2019 13:12
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save maxymania/f9b21fa69920a7a76a5a62e0738e4148 to your computer and use it in GitHub Desktop.
Save maxymania/f9b21fa69920a7a76a5a62e0738e4148 to your computer and use it in GitHub Desktop.
NodeJS Line Reader using async/await
/*
Copyright (C) 2019 Simon Schmidt
Usage of the works is permitted provided that this instrument is retained with
the works, so that any entity that uses the works is notified of this instrument.
DISCLAIMER: THE WORKS ARE WITHOUT WARRANTY.
*/
const EventEmitter = require('events');
function waitR(s) {
return new Promise((resolve)=>{
s.once('readable',resolve);
});
}
function waitDrain(s) {
return new Promise((resolve)=>{
s.once('drain',resolve);
});
}
// Not for streams!
function waitWake(s) {
return new Promise((resolve)=>{
s.once('wake',resolve);
});
}
async function read(s) {
for(;;) {
const d = s.read();
if(d) return d;
if(s.readableEnded) return null;
await waitR(s);
}
}
async function opawait(p) {
if(p instanceof Promise)
return await p;
return p;
}
function Reader(s) {
this.stream = s;
this.rest = null;
}
Reader.prototype.read = async function(n) {
for(;;){
const rest = this.rest;
if(rest){
if(!n) { this.rest = null; return rest; }
if(n>=rest.length) { this.rest = null; return rest; }
this.rest = rest.slice(n);
return rest.slice(0,n);
}
if(!n) return await read(this.stream);
this.rest = await read(this.stream);
if(!this.rest) return null;
}
};
Reader.prototype.readLinePart = async function() {
for(;;){
const rest = this.rest;
if(rest) {
const p = rest.indexOf(0xa);
if(p==-1) { this.rest = null; return [rest,true]; }
if((p+1)==rest.length) { this.rest = null; return [rest,false]; }
this.rest = rest.slice(p+1);
return [rest.slice(0,p+1),false];
}
this.rest = await read(this.stream);
if(!this.rest) return [null,false];
}
};
Reader.prototype.readLine = async function() {
var x = [];
for(;;){
const [line,isPrefix] = await this.readLinePart();
if(!line) break;
x.push(line.toString('latin1'));
if(!isPrefix) break;
}
return x.join('');
};
Reader.prototype.readFully = async function(n,dest) {
var sz=n;
for(;;) {
const rest = this.rest;
if(rest){
if(rest.length>sz) {
this.rest = rest.slice(sz);
await opawait(dest(rest.slice(0,sz)));
return n;
}
this.rest = null;
await opawait(dest(rest));
sz -= rest.length;
if(sz==0) return n;
}
this.rest = await read(this.stream);
if(this.rest==null) return n-sz;
}
};
function Chunker(stream) {
this.stream = stream;
this.queue = [];
this.last = false;
this.ev = new EventEmitter;
stream.on('end',()=>this._end());
stream.on('data',(chunk)=>this._data(chunk));
}
Chunker.prototype._end = function(){
this.last = true;
this.ev.emit('wake');
};
Chunker.prototype._data = function(chunk){
this.queue.push(chunk);
this.ev.emit('wake');
if(this.queue.length>=4)
this.stream.pause();
};
Chunker.prototype.read = async function() {
for(;;){
if(this.queue.length) break;
if(this.last) return null;
this.stream.resume();
await waitWake(this.ev);
}
const chunk = this.queue.shift();
if(this.queue.length<4) this.stream.resume();
return chunk;
};
async function write(s,chunk) {
if(!s.write(chunk)) await waitDrain(s);
}
function asDest(s) {
return chunk => write(s,chunk);
}
module.exports = {Reader,write,asDest,Chunker};
/*
Copyright (C) 2019 Simon Schmidt
Usage of the works is permitted provided that this instrument is retained with
the works, so that any entity that uses the works is notified of this instrument.
DISCLAIMER: THE WORKS ARE WITHOUT WARRANTY.
*/
function start(b) {
b().catch((e)=>{console.log(e);});
}
function once(f) {
const x = [false];
return v=>{
if(x[0])return;
x[0]=true;
f(v);
}
}
module.exports.start = start;
module.exports.once = once;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment