|
#!/usr/local/bin/node --max_old_space_size=1024 |
|
"use strict"; |
|
|
|
const fs = require('fs'); |
|
|
|
/* eslint-disable */ |
|
|
|
class StreamBuffer { |
|
constructor() { |
|
this.blocks = []; |
|
this.blockSize = 256 * 1024; |
|
this.count = 0; |
|
this.lastBlockLength = 0; |
|
} |
|
|
|
get length() { |
|
return this.blocks.length === 0 |
|
? 0 |
|
: (this.blocks.length - 1) * this.blockSize + this.lastBlockLength; |
|
} |
|
|
|
set length(value) { |
|
if (value < 0) throw new Error; |
|
this.blocks.length = Math.ceil(value / this.blockSize); |
|
|
|
for (let index = 0; index < this.blocks.length; ++index) |
|
if (!this.blocks[index]) |
|
this.blocks[index] = new Buffer(this.blockSize); |
|
|
|
this.lastBlockLength = ((value - 1) % this.blockSize) + 1; |
|
} |
|
|
|
write(buffer, offset = 0, length = buffer.length) { |
|
if (!buffer) throw new Error("Write request a Buffer as first parameter"); |
|
if (offset + length > buffer.length) throw new Error("Write buffer params out of index"); |
|
var bufferStart = offset, start = this.length, howMany, partToWrite, toBreak; |
|
this.length += length; |
|
while (true) { |
|
howMany = this.blockSize - this.offsetWithinBlock(start); |
|
if (howMany >= length - bufferStart) { |
|
howMany = length - bufferStart; |
|
toBreak = true; |
|
} |
|
partToWrite = buffer.slice(offset, offset + length); |
|
this.writeBlock(this.blockIndexByByteOffset(start), this.offsetWithinBlock(start), partToWrite); |
|
if (toBreak) break; |
|
bufferStart += howMany; |
|
start += howMany; |
|
} |
|
return this; |
|
} |
|
|
|
writeBlock(blockIndex, offset, buffer) { |
|
var block; |
|
block = this.blocks[blockIndex]; |
|
return buffer.copy(block, offset); |
|
} |
|
|
|
offsetWithinBlock(index) { |
|
return index % this.blockSize; |
|
} |
|
|
|
blockIndexByByteOffset(index) { |
|
return Math.floor(index / this.blockSize); |
|
} |
|
|
|
slice(start, end) { |
|
if (start > end) throw new Error("start should less than end") |
|
if (end > this.length) throw new Error("Index out of range"); |
|
if (start < 0 || end <= 0) throw new Error("Invalid Offset or Index"); |
|
var block, buffer, buffers = [], howMany, length = end - start; |
|
while (start < end) { |
|
howMany = this.blockSize - this.offsetWithinBlock(start); |
|
if (howMany > end - start) howMany = end - start; |
|
block = this.blocks[this.blockIndexByByteOffset(start)]; |
|
buffer = block.slice(this.offsetWithinBlock(start), this.offsetWithinBlock(start) + howMany); |
|
buffers.push(buffer); |
|
start += howMany; |
|
} |
|
return Buffer.concat(buffers, length); |
|
} |
|
|
|
byteAt(index) { |
|
if (index >= this.length) throw new Error("Index Error"); |
|
let blockIndex = Math.floor(index / this.blockSize); |
|
if (blockIndex >= this.blocks.length) throw new Error("Range Error"); |
|
let block = this.blocks[blockIndex]; |
|
let tailIndex = index % this.blockSize; |
|
return block[tailIndex]; |
|
} |
|
|
|
static getStream(path) { |
|
return typeof path.read === 'function' |
|
? path |
|
: typeof path === 'string' || path instanceof String |
|
? fs.createReadStream(path) |
|
: null |
|
} |
|
|
|
static fromFile(path) { |
|
return new Promise((resolve, reject) => { |
|
try { |
|
resolve(this.fromFileSync(path)) |
|
} catch (e) { |
|
reject(e); |
|
} |
|
}); |
|
} |
|
|
|
static fromFileSync(path) { |
|
let KB = 1024, |
|
bb = new StreamBuffer(), |
|
readStream = StreamBuffer.getStream(path), |
|
fd = readStream.fd, |
|
buffer = new Buffer(bb.blockSize), |
|
total = 0, |
|
mayBeTotal = 0, |
|
howManyRead, oldLength; |
|
|
|
while (true) { |
|
try { |
|
howManyRead = fs.readSync(fd, buffer, 0, buffer.length, null); |
|
} catch (e) { |
|
if (e.code === 'EAGAIN') { // 'resource temporarily unavailable' |
|
// Happens on OS X 10.8.3 (not Windows 7!), if there no stdin input - typically when invoking a script without any input (for interactive stdin input) |
|
// If you were to just continue, you'd create a tight loop |
|
throw 'ERROR: interactive stdin input not supported.'; |
|
} else if (e.code === 'EOF') { |
|
// Happens on Windows 7, but not OS X 10.8.3: simply signals the end of *piped* stdin input |
|
break; |
|
}; |
|
throw e; // unexpected exception |
|
} |
|
if (!howManyRead || howManyRead === 0) { // No more stdin input available |
|
// OS X 10.8.3: regardless of input method, this is how the end of input is signaled |
|
// Windows 7: this is how the end of input is signaled for *interactive* stdin input |
|
break; |
|
} |
|
oldLength = bb.length; |
|
total += howManyRead; |
|
bb.write(buffer, 0, howManyRead); |
|
mayBeTotal += bb.length - oldLength; |
|
} |
|
return bb; |
|
} |
|
} |
|
|
|
class LineReaderPromise { |
|
constructor(buffer) { |
|
this.buffer = buffer; |
|
this.index = 0; |
|
} |
|
|
|
readLine() { |
|
let lineCode = "\n".charCodeAt(0), buffer = this.buffer; |
|
if (this.index === buffer.length) return null; |
|
if (buffer.byteAt(this.index) === "\n") { |
|
this.index++; |
|
return ""; |
|
} |
|
let end = this.index, result; |
|
while (end < buffer.length) { |
|
if (lineCode === buffer.byteAt(end)) { |
|
result = buffer.slice(this.index, end).toString(); |
|
this.index = end + 1; |
|
return result; |
|
} |
|
end = end + 1; |
|
} |
|
this.index = buffer.length; |
|
return buffer.slice(this.index, end).toString(); |
|
} |
|
|
|
eachLine(callback) { |
|
return new Promise((resolve, reject) => { |
|
let result; |
|
try { |
|
while (true) { |
|
var line = this.readLine(); |
|
if (line === null) return resolve(this); |
|
callback(line); |
|
} |
|
} catch (e) { |
|
reject(e); |
|
} |
|
}); |
|
} |
|
|
|
static open(path) { |
|
return StreamBuffer |
|
.fromFile(path) |
|
.then(buffer => new LineReaderPromise(buffer)) |
|
} |
|
} |
|
|
|
module.exports = LineReaderPromise; |