Skip to content

Instantly share code, notes, and snippets.

@kirbysayshi
Created June 10, 2011 22:25
Show Gist options
  • Save kirbysayshi/1019906 to your computer and use it in GitHub Desktop.
Save kirbysayshi/1019906 to your computer and use it in GitHub Desktop.
consume twitter's public firehose using nodejs
exports.userpass = 'YOURUSERNAME:YOURPASSWORD';
var StreamConsumer = require('./StreamConsumer').StreamConsumer
,sc = new StreamConsumer({
headers: {
authorization: 'Basic ' + new Buffer( require('./credentials.js').userpass ).toString('base64')
,'user-agent': 'twitter-stream-filter-test-nodejs'
,'content-type': 'application/x-www-form-urlencoded; charset=utf-8'
}
,body: 'track=google'
})
sc.on('tweet', function(tweetObj, tweetStr){
//console.log(tweetStr);
});
sc.on('error', function(tweetStr, ex, currentString){
console.log('******* ERROR *******');
console.log(ex);
});
sc.on('stats', function(stats){
console.log(stats);
});
sc.makeSingleRequest();
var events = require('events')
,util = require('util')
,_ = require('underscore')
,http = require('http')
function StreamConsumer(options){
events.EventEmitter.call(this);
this.reEot = /.+\r\n/gm;
this.req = null;
this.reqStart = 0;
this.sampleStart = 0;
this.debug = false;
this.totalTweetCount = 0;
this.intervalTweetCount = 0;
this.samplingInterval = 25;
this.errorCount = 0;
this.options = _.defaults(options, {
host: 'stream.twitter.com'
,port: 80
,path: '/1/statuses/filter.json'
,method: 'POST'
,headers: {
authorization: 'Basic ' + new Buffer('YOURUSERNAME:YOURPASSWORD').toString('base64')
,'user-agent': 'twitter-stream-nodejs-test'
,'content-type': 'application/x-www-form-urlencoded; charset=utf-8'
}
,body: 'track=google\n'
});
}
util.inherits(StreamConsumer, events.EventEmitter);
StreamConsumer.prototype.makeSingleRequest = function(){
var self = this;
if(self.req === null){
self.req = http.request(self.options, function(res){
var currentTweetStr = '';
self.reqStart = self.sampleStart = +new Date();
res.setEncoding('utf8');
res.on('data', function(chunk){
var tweets = null
,parsed = null;
currentTweetStr += chunk;
while( (tweets = self.reEot.exec(currentTweetStr)) !== null ){
// metrics
self._tweetCountPlusOne();
self.debug && console.log('-----------------------------');
//parsed = tweets[0];
try {
parsed = JSON.parse(tweets[0]);
} catch(e){
parsed = null;
self.errorCount += 1;
self.emit('error', tweets[0], e, currentTweetStr);
}
// tweets[0] will be the entire tweet, unchunked
self.debug && console.log(tweets[0]);
if(parsed !== null){
self.emit('tweet', parsed, tweets[0]);
}
// remove chunk from remaining tweet
currentTweetStr = currentTweetStr.replace(tweets[0], '');
// reset tweet delimiter
self.reEot.lastIndex = 0;
}
});
res.on('error', function(){
self.emit('error', arguments);
});
res.on('end', function(){
self.emit('end', arguments);
});
});
self.req.write(self.options.body);
self.req.end();
}
}
StreamConsumer.prototype.connect = function(){
// TODO: add logic for connection errors, retry, throttling, etc
}
StreamConsumer.prototype._tweetCountPlusOne = function(){
this.totalTweetCount += 1;
this.intervalTweetCount += 1;
var thisSampleAvg
,allAvg
,now;
if(this.intervalTweetCount >= this.samplingInterval){
now = +new Date();
thisSampleAvg = (this.intervalTweetCount / (now - this.sampleStart) * 1000) | 0;
allAvg = (this.totalTweetCount / (now - this.reqStart) * 1000) | 0;
this.emit('stats', {
sampleAvg: thisSampleAvg
,average: allAvg
,totalTweets: this.totalTweetCount
,runningTime: ( (now - this.reqStart) / 1000)|0
,errors: this.errorCount
,memory: process.memoryUsage()
});
this.intervalTweetCount = 0;
this.sampleStart = now;
}
}
exports.StreamConsumer = StreamConsumer;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment