Skip to content

Instantly share code, notes, and snippets.

@rhyslbw
Last active August 29, 2015 14:23
Show Gist options
  • Save rhyslbw/a04fb872064116899d68 to your computer and use it in GitHub Desktop.
Save rhyslbw/a04fb872064116899d68 to your computer and use it in GitHub Desktop.
Proof of concept for Job Collection
/**
*
* @constructor
* @param {string} name A name for the queue instance
* @param {Object} options - options.driver String Alternative Mongo Url, options.customIndexes Object, options.setLogStream String, options.autoStart Boolean, options.security Object, options.jobsPublish Object, options.queues Array,
* @returns {undefined}
* @public
*/
JobCollectionManager.Queue = function(args){
var self = this;
if (!args) {
throw new Error('JobCollectionManager.Queue options argument is required');
}
if (!args.name) {
throw new Error('JobCollectionManager.Queue options must specify name');
}
var options = args.options || {};
self.name = args.name;
// used by pub/sub
self._streamName = 'jcm.' + self.name;
var serverOnly = args.options.serverOnly;
if(serverOnly){
}
if(Meteor.isServer && serverOnly.driver){
self._driver = new MongoInternals.RemoteCollectionDriver(serverOnly.driver);
self.collection = new JobCollection(self.name, self.driver);
} else {
self.collection = new JobCollection(self.name);
}
if(Meteor.isServer) {
// Set any custom indexes if collection is empty
if(self.collection.find().count() === 0 && serverOnly.customIndexes) {
self.collection._ensureIndex(serverOnly.customIndexes);
// TODO: Set to proper log
console.info('Indexes set for ' + self.name);
}
if(serverOnly.setLogStream)
self.collection.setLogStream(serverOnly.setLogStream);
if(serverOnly.security) {
if(serverOnly.security.allow) {
self.collection.allow(serverOnly.security.allow);
}
if(serverOnly.security.deny) {
self.collection.deny(serverOnly.security.deny);
}
}
if(serverOnly.jobsPublish){
Meteor.publish(self._streamName, serverOnly.jobsPublish.func);
}
if(serverOnly.autoStart !== false)
self.collection.startJobServer();
}
if(options.workers){
self.workerGroups = [];
Meteor.startup(function(){
options.workers.forEach(function(w){
// TODO: Set polling interval to 1000000000 if q.options.type === 'observer'
var workers = self.collection.processJobs(w.tasks, w.options, w.func);
console.log(workers.running());
if(w.type === 'observer'){
console.log(self.collection.find({ type: w.tasks, status: 'ready' }) // TODO: handle arrays
.observe({
added: function() {
workers.trigger();
}
}));
} else if(w.options.autoStart === false) {
workers.pause();
}
self.workerGroups.push(workers);
});
console.log('workerGroups', self.workerGroups);
});
}
}
// An JobCollectionManager.Queue can emit events
JobCollectionManager.Queue.prototype = new EventEmitter();
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment