Skip to content

Instantly share code, notes, and snippets.

@xaota
Created January 11, 2018 09:01
Show Gist options
  • Save xaota/1195fe44109e07363f7f206991b7ef7f to your computer and use it in GitHub Desktop.
Save xaota/1195fe44109e07363f7f206991b7ef7f to your computer and use it in GitHub Desktop.
чтение из файла / потока в синхронном треде

использование

const LineReaderPromise = require('./line-reader-promise.js');

LineReaderPromise.open(process.stdin)
  .then(reader => reader.eachLine(eachLine))
  .then(endOfFile)
  .catch(err => emptyStream());
  
function eachLine(line) { ... } // построчная обработка файла
function endOfFile() { ... }    // запустиится после прочтения всех строк
function emptyStream() { ... }  // запустится, если поток был пуст

в чем смысл?

Вы можете "завязаться" на успешность чтения потока, или, наоборот, на его отсутствие
(в обычном случае этого сдеать нельзя, т.к. никакие события наличия данных в потоке не сработают, а т.к. все ассинхронно, извне неизвестно, что поток был пуст)

фича

синхронное чтение из потока (!) может быть долгим, но за счет маленького буфера, процесс ноды не должен бокироваться.

#!/usr/local/bin/node --max_old_space_size=1024
"use strict";
const fs = require('fs');
/* eslint-disable */
class StreamBuffer {
constructor() {
this.blocks = [];
this.blockSize = 256 * 1024;
this.count = 0;
this.lastBlockLength = 0;
}
get length() {
return this.blocks.length === 0
? 0
: (this.blocks.length - 1) * this.blockSize + this.lastBlockLength;
}
set length(value) {
if (value < 0) throw new Error;
this.blocks.length = Math.ceil(value / this.blockSize);
for (let index = 0; index < this.blocks.length; ++index)
if (!this.blocks[index])
this.blocks[index] = new Buffer(this.blockSize);
this.lastBlockLength = ((value - 1) % this.blockSize) + 1;
}
write(buffer, offset = 0, length = buffer.length) {
if (!buffer) throw new Error("Write request a Buffer as first parameter");
if (offset + length > buffer.length) throw new Error("Write buffer params out of index");
var bufferStart = offset, start = this.length, howMany, partToWrite, toBreak;
this.length += length;
while (true) {
howMany = this.blockSize - this.offsetWithinBlock(start);
if (howMany >= length - bufferStart) {
howMany = length - bufferStart;
toBreak = true;
}
partToWrite = buffer.slice(offset, offset + length);
this.writeBlock(this.blockIndexByByteOffset(start), this.offsetWithinBlock(start), partToWrite);
if (toBreak) break;
bufferStart += howMany;
start += howMany;
}
return this;
}
writeBlock(blockIndex, offset, buffer) {
var block;
block = this.blocks[blockIndex];
return buffer.copy(block, offset);
}
offsetWithinBlock(index) {
return index % this.blockSize;
}
blockIndexByByteOffset(index) {
return Math.floor(index / this.blockSize);
}
slice(start, end) {
if (start > end) throw new Error("start should less than end")
if (end > this.length) throw new Error("Index out of range");
if (start < 0 || end <= 0) throw new Error("Invalid Offset or Index");
var block, buffer, buffers = [], howMany, length = end - start;
while (start < end) {
howMany = this.blockSize - this.offsetWithinBlock(start);
if (howMany > end - start) howMany = end - start;
block = this.blocks[this.blockIndexByByteOffset(start)];
buffer = block.slice(this.offsetWithinBlock(start), this.offsetWithinBlock(start) + howMany);
buffers.push(buffer);
start += howMany;
}
return Buffer.concat(buffers, length);
}
byteAt(index) {
if (index >= this.length) throw new Error("Index Error");
let blockIndex = Math.floor(index / this.blockSize);
if (blockIndex >= this.blocks.length) throw new Error("Range Error");
let block = this.blocks[blockIndex];
let tailIndex = index % this.blockSize;
return block[tailIndex];
}
static getStream(path) {
return typeof path.read === 'function'
? path
: typeof path === 'string' || path instanceof String
? fs.createReadStream(path)
: null
}
static fromFile(path) {
return new Promise((resolve, reject) => {
try {
resolve(this.fromFileSync(path))
} catch (e) {
reject(e);
}
});
}
static fromFileSync(path) {
let KB = 1024,
bb = new StreamBuffer(),
readStream = StreamBuffer.getStream(path),
fd = readStream.fd,
buffer = new Buffer(bb.blockSize),
total = 0,
mayBeTotal = 0,
howManyRead, oldLength;
while (true) {
try {
howManyRead = fs.readSync(fd, buffer, 0, buffer.length, null);
} catch (e) {
if (e.code === 'EAGAIN') { // 'resource temporarily unavailable'
// Happens on OS X 10.8.3 (not Windows 7!), if there no stdin input - typically when invoking a script without any input (for interactive stdin input)
// If you were to just continue, you'd create a tight loop
throw 'ERROR: interactive stdin input not supported.';
} else if (e.code === 'EOF') {
// Happens on Windows 7, but not OS X 10.8.3: simply signals the end of *piped* stdin input
break;
};
throw e; // unexpected exception
}
if (!howManyRead || howManyRead === 0) { // No more stdin input available
// OS X 10.8.3: regardless of input method, this is how the end of input is signaled
// Windows 7: this is how the end of input is signaled for *interactive* stdin input
break;
}
oldLength = bb.length;
total += howManyRead;
bb.write(buffer, 0, howManyRead);
mayBeTotal += bb.length - oldLength;
}
return bb;
}
}
class LineReaderPromise {
constructor(buffer) {
this.buffer = buffer;
this.index = 0;
}
readLine() {
let lineCode = "\n".charCodeAt(0), buffer = this.buffer;
if (this.index === buffer.length) return null;
if (buffer.byteAt(this.index) === "\n") {
this.index++;
return "";
}
let end = this.index, result;
while (end < buffer.length) {
if (lineCode === buffer.byteAt(end)) {
result = buffer.slice(this.index, end).toString();
this.index = end + 1;
return result;
}
end = end + 1;
}
this.index = buffer.length;
return buffer.slice(this.index, end).toString();
}
eachLine(callback) {
return new Promise((resolve, reject) => {
let result;
try {
while (true) {
var line = this.readLine();
if (line === null) return resolve(this);
callback(line);
}
} catch (e) {
reject(e);
}
});
}
static open(path) {
return StreamBuffer
.fromFile(path)
.then(buffer => new LineReaderPromise(buffer))
}
}
module.exports = LineReaderPromise;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment