- first install the dependencies:
npm i
- download file locally:
curl http://earthquake.usgs.gov/earthquakes/feed/v1.0/summary/all_month.csv > earthquakes.csv
- run the example:
node example.js
"use strict"; | |
// CSV data downloaded from the following url: | |
// http://earthquake.usgs.gov/earthquakes/feed/v1.0/summary/all_month.csv | |
var fs = require('fs'); | |
var csv = require('csv-parser'); | |
var FilterStream = require('./filter-stream'); | |
var Table = require('cli-table'); | |
var bubbleError = require('bubble-stream-error').bubble; | |
// create a cli table with the following headers | |
var table = new Table({ | |
head: ['Place', 'Depth', 'Time'], | |
style: { | |
head: ['cyan'] | |
} | |
}); | |
// created a filtering stream that outputs the earthquakes that took place | |
// in Puerto Rico and had a depth of at least 81 | |
var filterStream = new FilterStream({ | |
filter: function(item) { | |
return /New Guinea/i.exec(item.place) && (parseInt(item.depth, 10) > 80); | |
} | |
}); | |
var rs = fs.createReadStream(__dirname + '/earthquakes.csv'); | |
var csvStream = csv(); | |
bubbleError(filterStream, [rs, csvStream]); | |
filterStream.on('error', function(err) { throw err; }); | |
rs | |
.pipe(csvStream) | |
.pipe(filterStream) | |
.on('data', function(item) { | |
console.log(item); | |
// table.push([ | |
// item.place, | |
// item.depth, | |
// item.time | |
// ]); | |
}) | |
.on('end', function() { | |
// console.log('\n'); | |
// console.log(table.toString()); | |
// console.log('\n'); | |
}); |
"use strict"; | |
var assert = require('assert'); | |
var stream = require('stream'); | |
var util = require('util'); | |
var Transform = stream.Transform; | |
function FilterStream(options) { | |
options = options || {}; | |
options.objectMode = true; | |
// allow use without new | |
if (!(this instanceof FilterStream)) { | |
return new FilterStream(options); | |
} | |
assert(typeof options.filter === 'function', 'Must provide filter function'); | |
this.filter = options.filter; | |
Transform.call(this, options); | |
} | |
util.inherits(FilterStream, Transform); | |
FilterStream.prototype._transform = function (chunk, enc, cb) { | |
// emit data if filter returns truthy value | |
if (this.filter(chunk)) { | |
this.push(chunk, enc); | |
} | |
cb(); | |
}; | |
module.exports = FilterStream; |
{ | |
"name": "05-streams", | |
"version": "0.0.0", | |
"description": "", | |
"main": "example.js", | |
"dependencies": { | |
"bubble-stream-error": "0.0.1", | |
"cli-table": "~0.3.1", | |
"csv-parser": "~1.4.6" | |
}, | |
"devDependencies": {}, | |
"scripts": { | |
"test": "echo \"Error: no test specified\" && exit 1" | |
}, | |
"author": "Alexandru Vladutu <alexandru.vladutu@gmail.com> (http://careers.stackoverflow.com/alessioalex)", | |
"license": "ISC" | |
} |