Skip to content

Instantly share code, notes, and snippets.

@mzalazar
Created October 25, 2017 17:54
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mzalazar/e5250c736e39b41616f19a8fb2b63abf to your computer and use it in GitHub Desktop.
Save mzalazar/e5250c736e39b41616f19a8fb2b63abf to your computer and use it in GitHub Desktop.
var Writable = require('stream').Writable;
var util = require('util');
var redis = require('./redis');
var async = require('async');
module.exports = RedisWriteStream;
function RedisWriteStream(client_id, broadcast_id, options) {
if (!(this instanceof RedisWriteStream)) return new RedisWriteStream(options);
if (!options) options = {};
options.objectMode = true;
Writable.call(this, options);
}
util.inherits(RedisWriteStream, Writable);
RedisWriteStream.prototype._write = function write(doc, encoding, callback) {
doc = JSON.parse(JSON.stringify(doc));
// console.log('Vamos a escribir esto:');
// console.log(doc);
var prefix = 'client_id:' + doc.client_id + ':broadcast_id:' + doc.broadcast_id + ':';
// client_id:xx:broadcast_id:xx:SPOOLED_TOTAL (id_subscriber list)
redis.sadd(prefix + 'SPOOLED_TOTAL', doc.id_subscriber, function (err, response) {
if (err) return callback(err);
// Get rid of client_id and broadcast_id
delete doc['client_id'];
delete doc['broadcast_id'];
redis.sadd(prefix + 'SPOOLED', JSON.stringify(doc), function (err, response) {
return callback(err);
});
});
};
// Create WRITE STREAM
var RedisWriteStream = require('./redis_write_stream');
var rdb = new RedisWriteStream();
var highWaterMark = 10; // How many records will hold our memory ;-)
// Create READ STREAM (Get subscriber and its configured fields)
var stream = knex.select(knex.raw(broadcast_id +' AS broadcast_id')).select(knex.raw(client_id +' AS client_id')).select(field_list).from('subscribers').where(knex.raw('list_id = ?', [list_id])).stream({highWaterMark:highWaterMark});
// Connect both streams
stream.pipe(rdb);
// Finishing this loop...
rdb.on('finish', function () {
console.log('EVENT: "finish" was triggered in stream writer.'.yellow);
});
rdb.on('error', function (err) {
console.log('EVENT: "error" was triggered in stream writer.'.yellow);
});
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment