Skip to content

Instantly share code, notes, and snippets.

@njcaruso
Last active October 27, 2021 04:17
Show Gist options
  • Save njcaruso/ffa81dfbe491fcb8f176 to your computer and use it in GitHub Desktop.
Save njcaruso/ffa81dfbe491fcb8f176 to your computer and use it in GitHub Desktop.
Custom loopback change-stream to only show stream updates from the logged in user. The intent is to have a `Message` model that contains private messages for users. The current implementation of change-stream while it accepts an options.where clause, it does not use it. See https://github.com/strongloop/angular-live-set/issues/11.
var url = '/api/messages/stream-updates' +
'?access_token=' + LoopBackAuth.accessTokenId;
var src = new EventSource(url);
var changes = createChangeStream(src);
var set;
Message.find({
filter: {
where: {
userId: LoopBackAuth.currentUserId
},
order: 'created DESC',
limit: 5
}
}).$promise.then(function(results) {
set = new LiveSet(results, changes);
vm.messages = set.toLiveArray();
});
'use strict';
var PassThrough = require('stream').PassThrough;
var loopback = require('loopback');
/**
* The Message model
* @class Message
* @header Message object
*/
module.exports = function(messageModel) {
// Workaround for https://github.com/strongloop/loopback/issues/292
messageModel.streamUpdates = function(userId, cb) {
/* Based on persisted-model#createChangeStream
*
* currentUser is being populated in server.js using tips from here:
* https://github.com/strongloop/loopback/issues/569
*
* Ignoring paramter userId, and defaulting to logged in user
* future improvement will check if user has role 'admin', and if so
* allow the userId to not match the logged in user
*
*/
var currentUser = loopback.getCurrentContext().get('currentUser');
userId = currentUser.id.toString();
/* (NJC) apart from one line below with NJC checking matching userId,
* no other changes made from source
*/
var idName = this.getIdName();
var Model = this;
var changes = new PassThrough({objectMode: true});
var writeable = true;
changes.destroy = function() {
changes.removeAllListeners('error');
changes.removeAllListeners('end');
writeable = false;
changes = null;
};
changes.on('error', function() {
writeable = false;
});
changes.on('end', function() {
writeable = false;
});
process.nextTick(function() {
cb(null, changes);
});
Model.observe('after save', createChangeHandler('save'));
Model.observe('after delete', createChangeHandler('delete'));
function createChangeHandler(type) {
return function(ctx, next) {
// since it might have set to null via destroy
if (!changes) {
return next();
}
var where = ctx.where;
var data = ctx.instance || ctx.data;
/* (NJC) validate userId matches */
if (data.userId.toString() !== userId) {
return next();
}
var whereId = where && where[idName];
// the data includes the id
// or the where includes the id
var target;
if (data && (data[idName] || data[idName] === 0)) {
target = data[idName];
} else if (where && (where[idName] || where[idName] === 0)) {
target = where[idName];
}
var hasTarget = target === 0 || !!target;
var change = {
target: target,
where: where,
data: data
};
switch (type) {
case 'save':
if (ctx.isNewInstance === undefined) {
change.type = hasTarget ? 'update' : 'create';
} else {
change.type = ctx.isNewInstance ? 'create' : 'update';
}
break;
case 'delete':
change.type = 'remove';
break;
}
// TODO(ritch) this is ugly... maybe a ReadableStream would be better
if (writeable) {
changes.write(change);
}
next();
};
}
};
messageModel.remoteMethod(
'streamUpdates', {
description: 'stream updated messages of the logged in user.',
accessType: 'READ',
accepts: {
arg: 'userId',
type: 'string'
},
http: {
verb: 'get',
path: '/stream-updates'
},
returns: {
arg: 'changes',
type: 'ReadableStream',
json: true
}
}
);
};
{
"name": "Message",
"plural": "messages",
"base": "PersistedModel",
"properties": {
"id": {
"type": "string",
"id": true,
"generated": true
},
"created": {
"type": "Date"
},
"message": {
"type": "string",
"required": true
},
"messageType": {
"type": "string",
"required": true
}
},
"validations": [],
"relations": {
"user": {
"type": "belongsTo",
"model": "user",
"foreignKey": "userId"
}
},
"acls": [
{
"principalType": "ROLE",
"principalId": "$everyone",
"permission": "DENY"
},
{
"principalType": "ROLE",
"principalId": "$authenticated",
"permission": "ALLOW",
"property": "streamUpdates",
"accessType": "EXECUTE"
}
],
"methods": []
}
// snippet from server.js to put the user into the context:
// https://github.com/strongloop/loopback/issues/292
app.use(loopback.context());
app.use(loopback.token());
app.use(function(req, res, next) {
if (!req.accessToken) {
return next();
}
app.models.User.findById(req.accessToken.userId, function(err, user) {
if (err) {
return next(err);
}
if (!user) {
return next(new Error('No user with this access token was found.'));
}
res.locals.currentUser = user;
var loopbackContext = loopback.getCurrentContext();
if (loopbackContext) {
loopbackContext.set('currentUser', user);
}
next();
});
});
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment