Skip to content

Instantly share code, notes, and snippets.

@veered
Last active May 8, 2019 21:08
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save veered/33c1321e21790784fd12a14fb28f5ccc to your computer and use it in GitHub Desktop.
Save veered/33c1321e21790784fd12a14fb28f5ccc to your computer and use it in GitHub Desktop.
let SubsManager = {
defaultExpireTime: 1000*60*1,
subscriptions: {},
subscribePromise() {
let args = Array.from(arguments);
return new Promise(resolve => {
let sub,
onReady = function() {
if (sub) {
resolve.call(this, sub);
}
else {
// If this subscription is cached, then onReady will be
// called immediately. So the `this.subscribe.apply` call
// won't have returned yet. We need to wait until it has
// returned, because otherwise `sub` won't be defined.
Meteor.defer(() => resolve.call(this, sub));
}
};
sub = this.subscribe.apply(this, args.concat(onReady));
});
},
subscribe() {
let args = [this.defaultExpireTime].concat(Array.from(arguments));
return this.subscribeFor.apply(this, args);
},
subscribeFor() {
let args = this.parseArgs(arguments),
hash = this.hashSub(args.subName, args.subArgs);
if (!this.subscriptions[hash]) {
this.createSub(args.subName, args.subArgs, args.expireTime);
}
return this.subscriptions[hash].touch(args.callbacks);
},
parseArgs(args) {
args = Array.from(args);
let parsed = {
expireTime: args.shift(),
subName: args.shift(),
callbacks: {
onReady: null,
onStop: null,
}
};
// Parse callbacks
let lastArg = args[args.length-1];
if (lastArg) {
if (_.isFunction(lastArg))
parsed.callbacks.onReady = lastArg;
if (_.isFunction(lastArg.onReady))
parsed.callbacks.onReady = lastArg.onReady;
if (_.isFunction(lastArg.onStop))
parsed.callbacks.onStop = lastArg.onStop;
if ([lastArg, lastArg.onReady, lastArg.onStop].some(x => _.isFunction(x)))
args.pop();
}
// Everything left is an argument to the subscription
parsed.subArgs = args;
return parsed;
},
hashSub(name, args) {
let userID = Tracker.nonreactive(() => Meteor.userId());
return _.hashCode(EJSON.stringify([userID, name, args]));
},
createSub(name, args, expireTime) {
let sub = {
name,
hash: this.hashSub(name, args),
_ready: new ReactiveVar(false),
ready: () => sub._ready.get(),
_error: new ReactiveVar(),
error: () => sub._error.get(),
references: {},
callbacks: {
onReady() {
_.each(sub.references, (ref, refID) => {
ref.onReady();
if (ref._scheduledStop) {
Meteor.defer(() => sub.stop(refID));
}
});
sub._ready.set(true);
},
onStop(error) {
if (error) {
sub._error.set(error);
}
_.each(sub.references, (ref, refID) => {
sub.stop(refID);
});
SubsManager.removeSub(sub.hash);
},
},
start() {
// Build subscription arguments and callbacks
let fullArgs = [name].concat(args);
fullArgs.push(sub.callbacks);
// Create underlying raw meteor subscription
sub.rawSub = Tracker.nonreactive(() => Meteor.subscribe.apply(Meteor.subscribe, fullArgs));
return sub;
},
touch(callbacks = {}) {
let refID = Random.id();
// Record reference
sub.references[refID] = {
onReady: callbacks.onReady ? Blaze.wrap(callbacks.onReady) : () => true,
onStop: callbacks.onStop ? Blaze.wrap(callbacks.onStop) : () => true,
};
// Auto-stop if computation stops
let comp = Tracker.currentComputation;
// If we are in a template and not already in an autorun,
// use a template autorun as the wrapping computation.
if (!comp && Template._currentTemplateInstanceFunc) {
let tpl = Tracker.nonreactive(() => Template.instance());
comp = tpl.autorun(() => {});
}
if (comp) {
sub.references[refID].compID = comp._id;
comp.onInvalidate(() => sub.stop(refID));
comp.onStop(() => sub.stop(refID));
}
// If the subscription is already ready call onReady
if (sub.ready()) {
Tracker.nonreactive(() => sub.references[refID].onReady());
}
return _.extend({refID}, sub);
},
stop(refID) {
refID = refID || this.refID;
let ref = sub.references[refID];
if (!ref) {
return;
}
// We use curValue to avoid creating a
// reactive dependency.
let ready = sub._ready.curValue,
error = sub._error.curValue
;
// If the raw sub isn't ready yet (and hasn't errored),
// then hold off on stopping the multiplexed sub until
// the raw sub is ready (or errored).
if (!ready && !error) {
ref._scheduledStop = true;
return;
}
// We should make sure that onStop doesn't
// create a reactive dependency.
Tracker.nonreactive(() => error ? ref.onStop(error) : ref.onStop());
delete sub.references[refID];
this.delayedStop();
},
delayedStop: _.debounce(() => sub.stopNow(), expireTime),
stopNow() {
if (_.size(sub.references) === 0)
sub.rawSub.stop();
},
};
this.subscriptions[sub.hash] = sub.start();
},
removeSub(hash) {
delete this.subscriptions[hash];
},
};
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment