Skip to content

Instantly share code, notes, and snippets.

@bjoerge
Created March 26, 2015 19:14
Show Gist options
  • Star 4 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save bjoerge/9e88a877a0633705b728 to your computer and use it in GitHub Desktop.
Save bjoerge/9e88a877a0633705b728 to your computer and use it in GitHub Desktop.
Parse CSV into objects with RxJS
const Rx = require('rx');
const csv = require('csv-parse');
const fs = require('fs');
Rx.Node.fromReadableStream(fs.createReadStream('file.csv').pipe(csv()))
.skip(1)
.withLatestFrom(rows.take(1), (row, header) => {
// Map header[i] => row[i]
return row.reduce((rowObj, cell, i) => {
rowObj[header[i]] = cell;
return rowObj;
}, {});
})
.subscribe((row) => {
console.log("Row: %s", JSON.stringify(row, null, 2));
});
@iDVB
Copy link

iDVB commented Jul 21, 2015

rows is not defined.

@sirudog
Copy link

sirudog commented Jul 7, 2016

@iDVB is right, but with a small change it works, and it is also more readable:

var csvFile$ = Rx.Node.fromReadableStream(fs.createReadStream('file.csv').pipe(csv()));
var header$ = csvFile$.take(1);
var rows$ = csvFile$.skip(1);
rows$.withLatestFrom(header$, (row, header) => {
    return row.reduce((rowObj, cell, i) => {
        rowObj[header[i]] = cell;
        return rowObj;
    }, {});
})
.subscribe(row => {
    console.log("Row: %s", JSON.stringify(row, null, 2));
});

@QuentinRoy
Copy link

QuentinRoy commented Feb 11, 2017

Rx.Node seems not be maintained anymore and is not compatible with Rxjs5. Here is a version of fromStream Rxjs5 ready:

import Rx from 'rxjs/Rx';

const Observable = Rx.Observable;

// Adapted from https://github.com/Reactive-Extensions/rx-node/blob/87589c07be626c32c842bdafa782fca5924e749c/index.js#L52
export default function fromStream(stream, finishEventName = 'end', dataEventName = 'data') {
  stream.pause();

  return new Observable((observer) => {
    function dataHandler(data) {
      observer.next(data);
    }

    function errorHandler(err) {
      observer.error(err);
    }

    function endHandler() {
      observer.complete();
    }

    stream.addListener(dataEventName, dataHandler);
    stream.addListener('error', errorHandler);
    stream.addListener(finishEventName, endHandler);

    stream.resume();

    return () => {
      stream.removeListener(dataEventName, dataHandler);
      stream.removeListener('error', errorHandler);
      stream.removeListener(finishEventName, endHandler);
    };
  }).publish().refCount();
}

And here is my own version of the rxjs csv parser, more concise and supporting parser options:

import Rx from 'rxjs/Rx';
import fs from 'fs';
import csv from 'csv-parse';
import fromStream from './rxjs-from-stream';

const Observable = Rx.Observable;

const parse = (file, csvParserOptions) => new Observable((observer) => {
  const parser = csv(csvParserOptions);
  const lines$ = fromStream(fs.createReadStream(file).pipe(parser));
  lines$.subscribe(observer);
  return lines$;
});

export default parse;

Example of use:

import path from 'path';
import csv from './utils/rxjs-csv';

const getLogSubscriber = (name) => ({
  next(o) { console.log(`${name} next`, o); },
  err(err) { console.error(`${name} error`, err); },
  complete() { console.log(`${name} complete`); },
});

csv('file.csv'), { columns: true })
  .subscribe({
    next(o) { console.log(o); },
    err(err) { console.error('error:', err); },
    complete() { console.log('complete'); },
  });

@tokland
Copy link

tokland commented Feb 18, 2018

@QuentinRow: That's very cool and it probably deserves its own gist. Where is getLogSubscriber used? Typo: csv('file.csv'), { columns: true })

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment