Skip to content

Instantly share code, notes, and snippets.

@allupaku
Last active November 7, 2017 14:38
Show Gist options
  • Save allupaku/4139d4a366983568206a to your computer and use it in GitHub Desktop.
Save allupaku/4139d4a366983568206a to your computer and use it in GitHub Desktop.
This is a modified version for createChangeStream - because the default implementation as of today is not supporting where filter. This is using the same approach from persisted model and using loopback-filters for parsing the data which is been passed in. This may not be a perfect solution but a good workaround till we get the filters in core c…
// Angular Live Set should be added as a dependency, on the
// inject the change stream library in your main app script.
var module = angular.module("lbServices", ['ngResource','ls.ChangeStream']);
// ...
// To the end of auto generated code
// ....
this.$get = ['$resource','createChangeStream','LoopBackAuth', function($resource,createChangeStream,LoopBackAuth) {
return function(url, params, actions) {
var resource = $resource(url, params, actions);
actions.find.cache = true;
//work around for fixing the change stream and filters
// This will create a change stream and return teh change stream in the call back.
// there was a problem with the nodejs/express server disconnecting the streams after two minutes by default.
// fixed that issue by modifying and adding a middle ware as shown in the next file gist.
resource.buildChangeStream = function(options,cb,listener){
var src = new EventSource(actions.createChangeStream.url+'?_format=event-source&access_token='+LoopBackAuth.accessTokenId+'&options='+JSON.stringify(options));
if(listener && typeof listener === "function"){
src.addEventListener('data',listener);
}
var changes = createChangeStream(src);
cb(changes);
}
// Angular always calls POST on $save()
// This hack is based on
// http://kirkbushell.me/angular-js-using-ng-resource-in-a-more-restful-manner/
resource.prototype.$save = function(success, error) {
// Fortunately, LoopBack provides a convenient `upsert` method
// that exactly fits our needs.
var result = resource.upsert.call(this, {}, this, success, error);
return result.$promise || result;
};
// console.log("Actions when getting the resource");
// console.log(urlBase);
return resource;
};
}];
var applyFilter = require('loopback-filters');
var loopback = require('loopback');
var PassThrough = require('stream').PassThrough;
module.exports = function(Order) {
Order.createChangeStream = function(options,cb) {
/* Based on persisted-model#createChangeStream
*
* currentUser is being populated in server.js using tips from here:
* https://github.com/strongloop/loopback/issues/569
*
* Ignoring paramter userId, and defaulting to logged in user
* future improvement will check if user has role 'admin', and if so
* allow the userId to not match the logged in user
*
*/
var idName = this.getIdName();
var Model = this;
var changes = new PassThrough({objectMode: true});
var writeable = true;
changes.destroy = function() {
changes.removeAllListeners('error');
changes.removeAllListeners('end');
writeable = false;
changes = null;
};
changes.on('error', function() {
writeable = false;
});
changes.on('end', function() {
writeable = false;
});
process.nextTick(function() {
cb(null, changes);
});
Model.observe('after save', createChangeHandler('save',options));
Model.observe('after delete', createChangeHandler('delete',options));
function createChangeHandler(type,options) {
return function(ctx, next) {
// since it might have set to null via destroy
if (!changes) {
return next();
}
var where = ctx.where;
var data = ctx.instance || ctx.data;
var filtered = applyFilter([data], options);
if(filtered.length<1){
return next();
}
var whereId = where && where[idName];
// the data includes the id
// or the where includes the id
var target;
if (data && (data[idName] || data[idName] === 0)) {
target = data[idName];
} else if (where && (where[idName] || where[idName] === 0)) {
target = where[idName];
}
var hasTarget = target === 0 || !!target;
var change = {
target: target,
where: where,
data: data
};
switch (type) {
case 'save':
if (ctx.isNewInstance === undefined) {
change.type = hasTarget ? 'update' : 'create';
} else {
change.type = ctx.isNewInstance ? 'create' : 'update';
}
break;
case 'delete':
change.type = 'remove';
break;
}
// TODO(ritch) this is ugly... maybe a ReadableStream would be better
if (writeable) {
changes.write(change);
}
next();
};
}
};
}
// ...
// ...
app.middleware('routes:before', function (req, res, next) {
if (req.path.indexOf('change-stream') !== -1) {
res.setTimeout(24*3600*1000);
res.set('X-Accel-Buffering', 'no');
return next();
} else {
return next();
}
});
// ...
// ...
@jwebcat
Copy link

jwebcat commented Mar 22, 2017

how are you using this from the Angular controller? I have tried and it's not filtering per the options I pass it.

@nemes1s
Copy link

nemes1s commented Nov 7, 2017

Have you had any troubles with delivering data to event stream on instance change?
Sometimes i takes 3-4 model changes to get 1 record in EventStream on the client...

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment