Skip to content

Instantly share code, notes, and snippets.

@yvele
Created October 22, 2015 09:52
Show Gist options
  • Star 9 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save yvele/447555b1c5060952a279 to your computer and use it in GitHub Desktop.
Save yvele/447555b1c5060952a279 to your computer and use it in GitHub Desktop.
Line by line file reading with RxJS on Node.js
var Rx = require('rx');
var readline = require('readline');
var fs = require('fs');
var rl = readline.createInterface({
input: fs.createReadStream('lines.txt')
});
var lines = Rx.Observable.fromEvent(rl, 'line')
.takeUntil(Rx.Observable.fromEvent(rl, 'close'))
.subscribe(
console.log,
err => console.log("Error: %s", err),
() => console.log("Completed"));
@barkthins
Copy link

I get an error

TypeError: Cannot read property 'isTTY' of undefined
          at new Interface (readline.js:73:24)
          at Object.exports.createInterface (readline.js:39:10)

@barkthins
Copy link

barkthins commented Jan 26, 2016

I prefer skipping the whole readline include and just rolling this. This way you have the whole downstream Rx filter() map() etc. working with minimum of fuss:

function Tail(filename) {

    source = new Rx.Subject();

    var fh = fs.createReadStream(filename, 'utf8');
    var buffer = '';

    fh.on('data', function(data) {
            buffer += data;
            var parts = buffer.split('\n');
            buffer = parts.pop();
            parts.forEach(function (x) {
                x = JSON.parse(x);   // really should do this downstream, but for example doing this here
                source.onNext(x);
            });
    });
    fh.on('close', function(data) {
        source.onCompleted();
    });  

    return source;
}

@marceljuenemann
Copy link

readline's Interface implements AsyncIterable<string> to iterate over all lines, and RxJS supports from(AsyncIterable<T>) since version 7. Therefore, this is now a simple one liner:

import readline from "readline"
import { from } from 'rxjs'

const input = fs.createReadStream('lines.txt')
const lines = from(readline.createInterface({input}))
lines.subscribe({
  next: console.log,
  error: console.error,
  complete: () => console.log("DONE!")
})

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment