Skip to content

Instantly share code, notes, and snippets.

@alessioalex
Created November 22, 2014 13:11
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save alessioalex/d852f05a99c4acb0e4c3 to your computer and use it in GitHub Desktop.
Save alessioalex/d852f05a99c4acb0e4c3 to your computer and use it in GitHub Desktop.
CSV-streaming-parser

instructions

  • first install the dependencies:
npm i 
  • download file locally:
curl http://earthquake.usgs.gov/earthquakes/feed/v1.0/summary/all_month.csv > earthquakes.csv
  • run the example:
node example.js
"use strict";
// CSV data downloaded from the following url:
// http://earthquake.usgs.gov/earthquakes/feed/v1.0/summary/all_month.csv
var fs = require('fs');
var csv = require('csv-parser');
var FilterStream = require('./filter-stream');
var Table = require('cli-table');
var bubbleError = require('bubble-stream-error').bubble;
// create a cli table with the following headers
var table = new Table({
head: ['Place', 'Depth', 'Time'],
style: {
head: ['cyan']
}
});
// created a filtering stream that outputs the earthquakes that took place
// in Puerto Rico and had a depth of at least 81
var filterStream = new FilterStream({
filter: function(item) {
return /New Guinea/i.exec(item.place) && (parseInt(item.depth, 10) > 80);
}
});
var rs = fs.createReadStream(__dirname + '/earthquakes.csv');
var csvStream = csv();
bubbleError(filterStream, [rs, csvStream]);
filterStream.on('error', function(err) { throw err; });
rs
.pipe(csvStream)
.pipe(filterStream)
.on('data', function(item) {
console.log(item);
// table.push([
// item.place,
// item.depth,
// item.time
// ]);
})
.on('end', function() {
// console.log('\n');
// console.log(table.toString());
// console.log('\n');
});
"use strict";
var assert = require('assert');
var stream = require('stream');
var util = require('util');
var Transform = stream.Transform;
function FilterStream(options) {
options = options || {};
options.objectMode = true;
// allow use without new
if (!(this instanceof FilterStream)) {
return new FilterStream(options);
}
assert(typeof options.filter === 'function', 'Must provide filter function');
this.filter = options.filter;
Transform.call(this, options);
}
util.inherits(FilterStream, Transform);
FilterStream.prototype._transform = function (chunk, enc, cb) {
// emit data if filter returns truthy value
if (this.filter(chunk)) {
this.push(chunk, enc);
}
cb();
};
module.exports = FilterStream;
{
"name": "05-streams",
"version": "0.0.0",
"description": "",
"main": "example.js",
"dependencies": {
"bubble-stream-error": "0.0.1",
"cli-table": "~0.3.1",
"csv-parser": "~1.4.6"
},
"devDependencies": {},
"scripts": {
"test": "echo \"Error: no test specified\" && exit 1"
},
"author": "Alexandru Vladutu <alexandru.vladutu@gmail.com> (http://careers.stackoverflow.com/alessioalex)",
"license": "ISC"
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment