Create a gist now

Instantly share code, notes, and snippets.

@laser /streams.js
Last active Feb 6, 2017

What would you like to do?
Socket.IO + Bacon.js
var request = require('request-json'),
express = require('express'),
app = express(),
http = require('http').Server(app),
io = require('socket.io'),
Bacon = require('baconjs').Bacon,
client = request.newClient('http://localhost:3000/api/'),
db = require('mongodb').MongoClient;
///////////////////////////////////////////////////////////
// this is all boilerplate; scroll down for demo code
//
app.get('/', function(req, res) {
res.sendFile('./static/index.html');
});
app.post('/api/log', function(req, res) {
var x = ((Math.random() * 10) > 5) ? "NOT\nJSON" : true;
res.send(x);
});
app.get('/api/weather', function(req, res) {
res.send({
"main": {
"temp": (Math.round(Math.random() * 1000) / 10)
},
"weather": [{
"main": "Cloudy"
}]
});
});
app.use(express.static('./static'));
http.listen(3000, function() {
console.log(new Date(), 'started server');
});
io = io(http);
function getFunFact(callback) {
client.connect('mongodb://127.0.0.1:27017/library', function(err, db) {
db.collection('facts').findOne(callback);
});
}
///////////////////////////////////////////////////////////
// demo impl below
//
var connections = Bacon.fromBinder(function(sink) {
io.on('connection', sink)
});
var messages = connections.flatMap(function(socket) {
return Bacon.fromBinder(function(sink) {
socket.on('message', function(txt) {
sink({ author: socket, txt: txt });
});
});
});
var currentMessages = messages
.scan([], function(acc, message) {
return acc.length === 20 ? [] : acc.concat(message);
});
var funFact = currentMessages
.filter(function(acc) {
return acc.length === 20;
})
.flatMap(function() {
return Bacon.fromNodeCallback(getFunFact);
})
.map(function(fact) {
return "Did you know that: " + fact.text;
});
var entries = currentMessages
.filter(function(acc) {
return acc.length === 20;
})
.flatMap(function(messages) {
return Bacon.retry({
retries: 10,
delay: function() { return 100; },
source: function() {
return Bacon.fromNodeCallback(client, 'post', 'log', {
messages: messages
});
}
});
});
connections.onValue(function(socket) {
socket.broadcast.emit('message', 'CONN: ' + socket.id);
});
messages.onValue(function(message) {
io.emit('message', '' + message.author.id + ': ' + message.txt);
});
funFact.onValue(function(fact) {
io.emit('message', fact);
});
entries.onValue(function() {
// deliberate no-op
});
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment