Skip to content

Instantly share code, notes, and snippets.

@mattpodwysocki
Created May 30, 2012 06:01
Show Gist options
  • Save mattpodwysocki/2834028 to your computer and use it in GitHub Desktop.
Save mattpodwysocki/2834028 to your computer and use it in GitHub Desktop.
var Rx = require('./rx.node')
, twitter = require('ntwitter')
, credentials = require('./credentials')
, io = require('socket.io').listen(80);
var customBind = function (self, method) {
return function () {
return self[method].apply(self, arguments);
};
};
// Turn twitter into stream
twitter.prototype.streamAsObservable = function (url, data) {
return Rx.Observable.create(function (observer) {
var outerStream;
twit.stream(url, data, function (stream) {
outerStream = stream;
// Handle events
var onCompleted = customBind(observer, 'onCompleted');
stream.on('data', customBind(observer, 'onNext'));
stream.on('end', onCompleted);
stream.on('destroy', onCompleted);
stream.on('error', customBind(observer, 'onError'));
});
// Destroy upon exiting
return function () {
if (outerStream) {
outerStream.destroy();
}
};
});
};
var twit = new twitter(credentials);
var tweetStream = twit.streamAsObservable('statuses/sample');
// Group by language of the user
var groupedTweetStream = tweetStream.groupBy(function (tweet) {
return tweet.user.lang;
});
io.sockets.on('connection', function (socket) {
// Get grouping
groupedTweetStream.subscribe(function (groupedTweet) {
// Aggregate tweet count based upon language
groupedTweet.scan({count: 0, key: groupedTweet.key }, function (acc, x) {
return { key: groupedTweet.key, count: acc.count + 1, text: x.text };
}).subscribe(function (status) {
// Emit data
io.sockets.emit('message', {
languageCode: status.key,
tweetCount: status.count,
text: status.text
});
});
}, function (err) {
io.sockets.emit('error', err);
});
});
@deanrad
Copy link

deanrad commented Jan 3, 2016

Very nice!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment