Last active
August 29, 2015 14:23
-
-
Save rhyslbw/a04fb872064116899d68 to your computer and use it in GitHub Desktop.
Proof of concept for Job Collection
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* | |
* @constructor | |
* @param {string} name A name for the queue instance | |
* @param {Object} options - options.driver String Alternative Mongo Url, options.customIndexes Object, options.setLogStream String, options.autoStart Boolean, options.security Object, options.jobsPublish Object, options.queues Array, | |
* @returns {undefined} | |
* @public | |
*/ | |
JobCollectionManager.Queue = function(args){ | |
var self = this; | |
if (!args) { | |
throw new Error('JobCollectionManager.Queue options argument is required'); | |
} | |
if (!args.name) { | |
throw new Error('JobCollectionManager.Queue options must specify name'); | |
} | |
var options = args.options || {}; | |
self.name = args.name; | |
// used by pub/sub | |
self._streamName = 'jcm.' + self.name; | |
var serverOnly = args.options.serverOnly; | |
if(serverOnly){ | |
} | |
if(Meteor.isServer && serverOnly.driver){ | |
self._driver = new MongoInternals.RemoteCollectionDriver(serverOnly.driver); | |
self.collection = new JobCollection(self.name, self.driver); | |
} else { | |
self.collection = new JobCollection(self.name); | |
} | |
if(Meteor.isServer) { | |
// Set any custom indexes if collection is empty | |
if(self.collection.find().count() === 0 && serverOnly.customIndexes) { | |
self.collection._ensureIndex(serverOnly.customIndexes); | |
// TODO: Set to proper log | |
console.info('Indexes set for ' + self.name); | |
} | |
if(serverOnly.setLogStream) | |
self.collection.setLogStream(serverOnly.setLogStream); | |
if(serverOnly.security) { | |
if(serverOnly.security.allow) { | |
self.collection.allow(serverOnly.security.allow); | |
} | |
if(serverOnly.security.deny) { | |
self.collection.deny(serverOnly.security.deny); | |
} | |
} | |
if(serverOnly.jobsPublish){ | |
Meteor.publish(self._streamName, serverOnly.jobsPublish.func); | |
} | |
if(serverOnly.autoStart !== false) | |
self.collection.startJobServer(); | |
} | |
if(options.workers){ | |
self.workerGroups = []; | |
Meteor.startup(function(){ | |
options.workers.forEach(function(w){ | |
// TODO: Set polling interval to 1000000000 if q.options.type === 'observer' | |
var workers = self.collection.processJobs(w.tasks, w.options, w.func); | |
console.log(workers.running()); | |
if(w.type === 'observer'){ | |
console.log(self.collection.find({ type: w.tasks, status: 'ready' }) // TODO: handle arrays | |
.observe({ | |
added: function() { | |
workers.trigger(); | |
} | |
})); | |
} else if(w.options.autoStart === false) { | |
workers.pause(); | |
} | |
self.workerGroups.push(workers); | |
}); | |
console.log('workerGroups', self.workerGroups); | |
}); | |
} | |
} | |
// An JobCollectionManager.Queue can emit events | |
JobCollectionManager.Queue.prototype = new EventEmitter(); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment