Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save AnalogJ/bbec266c6d85dc2d215f to your computer and use it in GitHub Desktop.
Save AnalogJ/bbec266c6d85dc2d215f to your computer and use it in GitHub Desktop.
Ducktyping Sailsjs for Background Tasks via Kue. See http://blog.thesparktree.com
var _ = require('lodash'),
Waterline = require('waterline'),
path = require('path'),
url = require('url'),
kue = require('kue'),
redis = require('../../node_modules/sails/node_modules/socket.io/node_modules/redis'),
q = require('q')
////////////////////////////////////////////////////////////////////
//// SAILS ENV
////////////////////////////////////////////////////////////////////
//resolve the sails config files required.
var config_path = path.resolve(__dirname,'../..', 'config/')
var nconf_required = require(config_path+'/nconf.js');
global.sails = {
config: nconf_required
};
var filefog_required = require(config_path+'/filefog.js');
sails.config.filefog = filefog_required.filefog;
var constants_required = require(config_path+'/constants.js');
sails.config.constants = constants_required.constants;
sails.log = require(config_path+'/log.js').log.custom
// Instantiate a new instance of the ORM
var connection = url.parse(sails.config.nconf.get('DATABASE_URL'))
////////////////////////////////////////////////////////////////////
//// REDIS CONFIG
////////////////////////////////////////////////////////////////////
global.redis_client = redis.createClient(sails.config.nconf.get("REDIS_CONNECTION:PORT"), sails.config.nconf.get("REDIS_CONNECTION:HOST"),{auth_pass: sails.config.nconf.get("REDIS_CONNECTION:AUTH") || null});
function generate_model_message(model_name,model_id,action, verb,data){
var message = {
"name":model_name,
"args":[{
"verb" : verb,
"data" : data.toJSON(),
"id" : model_id
}]
};
var wrapper = {};
wrapper.nodeId = 648745922; //this could be randomly chosen if we cant determine the client id.
wrapper.args = [
"/sails_model_"+model_name+"_"+model_id + ":"+action,
"5:::"+JSON.stringify(message),
null,
[]
]
return JSON.stringify(wrapper);
}
function generate_association_message(model_name,model_id,attribute, id_associated, action, verb, verbId){
var item ={
"verb" : verb,
"attribute" : attribute,
"id" : model_id
}
item[verbId] = id_associated;
var message = {
"name":model_name,
"args":[item]
};
var wrapper = {};
wrapper.nodeId = 648745922; //this could be randomly chosen if we cant determine the client id.
wrapper.args = [
"/sails_model_"+model_name+"_"+model_id + ":"+action+":"+attribute,
"5:::"+JSON.stringify(message),
null,
[]
]
return JSON.stringify(wrapper);
}
////////////////////////////////////////////////////////////////////
//// WATERLINE CONFIG
////////////////////////////////////////////////////////////////////
var orm = new Waterline();
// Require any waterline compatible adapters here
var postgresqlAdapter = require('sails-postgresql');
// Build A Config Object
var config = {
// Setup Adapters
// Creates named adapters that have have been required
adapters: {
'sails-postgresql': postgresqlAdapter
},
// Build Connections Config
// Setup connections using the named adapter configs
connections: {
qtPostgresqlServer: {
adapter: 'sails-postgresql',
host: connection.hostname,
port: connection.port || 5432,
user: connection.auth.split(':')[0],
password: connection.auth.split(':')[1],
database: connection.path.substring(1)
}
},
defaults: {
migrate: 'alter'
}
};
//////////////////////////////////////////////////////////////////
// WATERLINE SERVICES
//////////////////////////////////////////////////////////////////
var api_dir = path.resolve(__dirname,'../..', 'api/')
// load services
var services = require('include-all')({
dirname : api_dir +'/services',
filter : /(.+)\.js$/,
excludeDirs : /^\.(git|svn)$/,
optional : true
});
_.forEach(services, function(service,key){
sails.log.info("Loading service: "+key)
global[key] = service;
});
//////////////////////////////////////////////////////////////////
// WATERLINE MODELS
//////////////////////////////////////////////////////////////////
// load models
var models = require('include-all')({
dirname : api_dir +'/models',
filter : /(.+)\.js$/,
excludeDirs : /^\.(git|svn)$/,
optional : true
});
_.forEach(models, function(model,key){
sails.log.info("Register model: "+key)
model.identity = key.toLowerCase();
model.connection = 'qtPostgresqlServer';
//add publish methods
model.publishCreate = function(id, data){
redis_client.publish("dispatch", generate_model_message(model.identity,id,"update","updated",data))
};
model.publishUpdate = function(id, data){
redis_client.publish("dispatch", generate_model_message(model.identity,id,"create","created",data))
};
model.publishAdd = function(id,attribute, idAdded){
redis_client.publish("dispatch", generate_association_message(model.identity,id,attribute, idAdded, "add", "addedTo", "addedId"))
};
model.publishRemove = function(id,attribute, idRemoved){
redis_client.publish("dispatch", generate_association_message(model.identity,id,attribute, idRemoved, "remove", "removedFrom", "removedId"))
};
var waterline_model = Waterline.Collection.extend(model);
orm.loadCollection(waterline_model);
});
//////////////////////////////////////////////////////////////////
// Initialization
//////////////////////////////////////////////////////////////////
function init_redis(){
var deferred = q.defer();
redis_client.on("ready", function () {
sails.log.info("Redis ready")
return deferred.resolve(redis_client);
});
return deferred.promise;
}
function init_waterline(){
var deferred = q.defer();
// Start Waterline passing adapters in
orm.initialize(config, function(err, models) {
if (err) {
return deferred.reject(err)
}
else{
sails.log.info("Waterline ready")
return deferred.resolve(models);
}
});
return deferred.promise;
}
q.spread([init_redis(),init_waterline()],function(redis_client,waterline_models){
sails.models = waterline_models.collections;
sails.connections = waterline_models.connections;
_.forEach(sails.models, function(model, name){
name = name.charAt(0).toUpperCase() + name.slice(1);
global[name] = model;
})
sails.log.info("Starting kue")
var kue_engine = kue.createQueue({
prefix: 'kue',
redis: {
port: sails.config.nconf.get('REDIS_CONNECTION:PORT'),
host: sails.config.nconf.get('REDIS_CONNECTION:HOST'),
auth: sails.config.nconf.get('REDIS_CONNECTION:AUTH') || null
}
});
//register jobs.
var jobs = require('include-all')({
dirname : __dirname +'/jobs',
filter : /(.+)\.js$/,
excludeDirs : /^\.(git|svn)$/,
optional : true
});
_.forEach(jobs, function(job, name){
sails.log.info("Registering kue handler: "+name)
kue_engine.process(name, job);
})
process.once('SIGTERM', function (sig) {
kue_engine.shutdown(function (err) {
sails.log.error("Shutting down kue")
process.exit(0);
}, 5000);
});
});
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment