Skip to content

Instantly share code, notes, and snippets.

@rphillips
Created July 30, 2014 01:28
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save rphillips/510069bac2cf7eff9d19 to your computer and use it in GitHub Desktop.
Save rphillips/510069bac2cf7eff9d19 to your computer and use it in GitHub Desktop.
var async = require('async');
var _ = require('underscore');
var ld = require('ld');
var dbopsEntity = require('../db/ops/entity');
var dbopsAgentConnection = require('../db/ops/agent_connection');
var dbopsAgent = require('../db/ops/agent');
var errors = require('../util/errors');
var flowCtrl = require('rackspace-shared-utils/lib/flow_control');
var log = require('logmagic').local('ele.lib.util.agent');
var instruments = require('rackspace-shared-utils/lib/instruments');
var CHECK_TYPES = require('../db/models/check_type').CHECK_TYPES;
var settings = require('./settings');
var aeUtils = require('../agent_endpoint/utils');
var getEndpointClient = require('../agent_endpoint/endpoint_client').getEndpointClient;
var getHostInfoNameFromAgentType = require('../agent_endpoint/endpoint_client').getHostInfoNameFromAgentType;
var MAX_RETRIES = 3;
var DEFAULT_AGENT_TIMEOUT = 60 * 1000; // milliseconds
// Thrift Error Messages
var THRIFT_NO_CONNECTIONS_ERROR = 'No connections';
/**
* Calculates relative 'distance' to a datacenter.
* Note that this uses the "distance" of the strings, until
* such a time we have a better way to make sure dfw == dfw1 and such.
*
* An ideal implementation would have latency information for
* every target datacenter here, rathern than using a string
* distance.
*
* @param {String} current Current Datacenter to measure from.
* @param {String} dest Destination Datacenter to measure to.
* @return {Number} Ascendering order rank (lower is better).
*/
function datacenterDistance(current, dest) {
return ld.computeDistance(current, dest);
}
/**
* AgentHelper
*
* This module queries for agent connections and attempts to run a method on
* each connection until successful.
*
* One of the following properties needs to be set:
* - AgentID
* - EntityID
* - Entity
*
* The class will automatically use one of these properties to find the agent
* connections or return an error to the 'run' callback.
*
* Note: The parameters from the iterator callback get passed through to the
* completion callback.
*
* @param {Object} options Various options.
*
* @constructor
*/
function AgentHelper(options) {
options = options || {};
this.agentId = null;
this.entityId = null;
this.entity = null;
this.timeout = options.timeout || DEFAULT_AGENT_TIMEOUT;
}
/** Set the AgentId
* @param {String} id The Agent's ID.
*/
AgentHelper.prototype.setAgentId = function(id) {
this.agentId = id;
};
/** Set the Entity ID.
* @param {String} id The Entity's ID.
*/
AgentHelper.prototype.setEntityId = function(id) {
this.entityId = id;
};
/** Set the Entity.
* @param {Object} entity The Entity.
*/
AgentHelper.prototype.setEntity = function(entity) {
this.entity = entity;
};
/** Attempt the endpoints.
* @param {Context} ctx The context.
* @param {Array} connections The agent connections.
* @param {Function} iter The iterator to run.
* @param {Function} callback The callback.
*/
AgentHelper.prototype._tryEndpoints = function(ctx, connections, iter, callback) {
var self = this,
mydc = settings.DATACENTER.toLowerCase();
function sortConnections(conns) {
if (settings.DATACENTER) {
conns = _.sortBy(conns, function(conn) {
return datacenterDistance(mydc, conn.datacenter.toLowerCase());
});
}
else {
/* In the event our system is misconfigured, use a randomized order. */
conns = _.shuffle(conns);
}
return conns;
}
async.auto({
conns: function(callback) {
function iter(conn, callback) {
var client = getEndpointClient(conn.endpoint_thrift_addr);
client.getAgentConnStatus(ctx, conn.guid, FILTER_TIMEOUT, function(err) {
if (err) {
callback(false);
} else {
callback(true);
}
});
}
async.filter(connections, iter, function(results) {
callback(null, sortConnections(results));
});;
},
aep: ['conns', function(callback, results) {
if (results.conns.length === 0) {
log.debug('Could not connect to agent', { ctx: ctx, agentId: self.agentId });
callback(new errors.AgentEndpointError(null, 400, 'Could not connect to agent'));
return;
}
client = getEndpointClient(thriftAddr);
}]
}, function(err) {
if (err) {
callback(err);
return;
}
callback.apply(null, responseArgs);
});
function run(callback) {
var client, agentCtx, thriftAddr;
thriftAddr = connections[i].endpoint_thrift_addr;
log.debug('Attempting Agent Endpoint Thrift Connection', {
thrift_addr: thriftAddr,
ctx: ctx
});
client = getEndpointClient(thriftAddr);
agentCtx = {};
agentCtx.client = client;
agentCtx.connections = connections;
agentCtx.agentId = self.agentId;
agentCtx.ctx = ctx;
agentCtx.guid = connections[i].guid;
iter(agentCtx, function(err) {
if (err) {
log.debug('Received error from agent endpont', {
ctx: ctx,
agentId: self.agentId,
thriftAddr: thriftAddr,
err: err
});
i++;
} else {
log.debug('Successful response from agent endpoint', {ctx: ctx, agentId: self.agentId, thriftAddr: thriftAddr});
}
responseArgs = Array.prototype.slice.call(arguments);
callback(err);
});
}
};
/** Run an iter given a context.
* @param {Context} ctx The context.
* @param {Function} iter The iterator (agentCtx, callback).
* @param {Function} callback The completion callback (err, {iterator callback params}.
*/
AgentHelper.prototype.run = function(ctx, iter, callback) {
var self = this;
if (!_.isString(self.agentId) &&
!_.isString(self.entityId) &&
!self.entity) {
process.nextTick(function() {
callback(new errors.AgentDoesNotExistError(self.agentId));
return;
});
return;
}
async.auto({
getEntity: function getEntity(callback) {
if (self.agentId) {
callback();
return;
}
if (self.entity) {
self.agentId = self.entity.agent_id;
callback();
} else if (self.entityId) {
dbopsEntity.get(ctx, self.entityId, function(err, entity) {
if (err) {
callback(err);
return;
}
self.agentId = entity.agent_id;
callback();
});
}
},
validateAgentId: ['getEntity', function validateAgentId(callback) {
if (!self.agentId) {
callback(new errors.AgentNotBound());
} else {
callback();
}
}],
getAgent: ['validateAgentId', function getAgent(callback) {
dbopsAgent.get(ctx, self.agentId, function(err, agent) {
if (err && err instanceof errors.ObjectDoesNotExistError) {
callback(new errors.AgentDoesNotExistError(self.agentId));
return;
}
callback(err);
});
}],
getConnections: ['validateAgentId', function getAllConnections(callback) {
dbopsAgentConnection.getAll(ctx, self.agentId, callback);
}],
validateConnections: ['getAgent', 'getConnections', function validateConnections(callback, results) {
var connections = results.getConnections[0];
// This agent is not connected if length == 0
if (connections.length === 0) {
callback(new errors.AgentNotConnected(self.agentId));
return;
}
callback(null, connections);
}]
},
function(err, results) {
if (err) {
if (!(err instanceof errors.AgentNotConnected)) {
log.debug('Error validating agent before running operation.', {ctx: ctx, actual_err: err});
instruments.recordEvent('lib.util.agent.agent_not_connected');
}
callback(err);
return;
}
self._tryEndpoints(ctx, results.validateConnections, iter, callback);
});
};
/** Export AgentHelper */
exports.AgentHelper = AgentHelper;
/** Gathers host information types by Agent ID
* @param {Context} ctx agent The Context.
* @param {String} agentId Agent ID.
* @param {Function} callback expects(err, result).
*/
exports.hostInfoTypesByAgent = function(ctx, agentId, callback) {
async.waterfall([
function queryTheEndpoint(callback) {
var helper;
function iter(agentCtx, callback) {
agentCtx.client.getHostInfoTypes(agentCtx.ctx, agentCtx.guid, callback);
}
helper = new AgentHelper();
helper.setAgentId(agentId);
helper.run(ctx, iter, callback);
},
function parseResponse(response, info, callback) {
var obj = {};
obj.types = [];
_.each(response.types, function(typename) {
obj.types.push(getHostInfoNameFromAgentType(typename));
});
obj.getSerializerType = function() {
return 'agent_list_host_info_types';
};
callback(null, obj);
}
], callback);
};
/** Gathers host information by Agent ID
* @param {Context} ctx agent The Context.
* @param {Object} hostInfoType HostInfoType object.
* @param {String} agentId Agent ID.
* @param {Function} callback expects(err, result).
*/
exports.hostInfoByAgent = function(ctx, hostInfoType, agentId, callback) {
async.waterfall([
function queryTheEndpoint(callback) {
var helper;
function iter(agentCtx, callback) {
agentCtx.client.getHostInfo(agentCtx.ctx, agentCtx.guid, hostInfoType.type, callback);
}
helper = new AgentHelper();
helper.setAgentId(agentId);
helper.run(ctx, iter, callback);
},
function parseResponse(response, info, callback) {
var metrics, serializerType = hostInfoType.serializerType,
obj = {};
if (response.metrics && (response.metrics instanceof Array)) {
if (hostInfoType.isArray) {
metrics = aeUtils.convertMetricsToSwizObject(response.metrics, serializerType);
}
else {
metrics = aeUtils.convertMetricToSwizObject(response.metrics[0], serializerType);
}
obj.error = response.error;
obj.info = metrics;
obj.timestamp = response.timestamp.valueOf();
obj.getSerializerType = function() {
return 'agent_host_info';
};
callback(null, obj);
}
else if (response.error) {
if (response.error === THRIFT_NO_CONNECTIONS_ERROR) {
callback(new errors.AgentNotConnected());
} else {
callback(new errors.AgentError(response.error));
}
}
else {
callback(new errors.AgentError('Invalid Response'));
}
}
], callback);
};
/** Tell Agent it's schedule changed.
* @param {Context} ctx agent The Context.
* @param {String} agentId Agent ID.
* @param {Function} callback expects(err, result).
*/
exports.sendCheckScheduleChanged = function(ctx, agentId, callback) {
async.waterfall([
function queryTheEndpoint(callback) {
var helper;
function iter(agentCtx, callback) {
agentCtx.client.sendCheckScheduleChanged(agentCtx.ctx, agentCtx.guid, callback);
}
helper = new AgentHelper();
helper.setAgentId(agentId);
helper.run(ctx, iter, callback);
},
function parseResponse(response, info, callback) {
if (response.error) {
if (response.error === THRIFT_NO_CONNECTIONS_ERROR) {
callback(new errors.AgentNotConnected());
return;
}
else {
callback(new errors.AgentError(response.error));
return;
}
}
else {
callback(null, response);
}
}
], callback);
};
/** Gathers Target informatin by Agent ID.
* @param {Context} ctx agent The Context.
* @param {String} checkType the check type.
* @param {String} agentId Agent ID.
* @param {Function} callback expects(err, result).
*/
exports.checkTargets = function(ctx, checkType, agentId, callback) {
async.waterfall([
function queryTheEndpoint(callback) {
var helper;
function iter(agentCtx, callback) {
agentCtx.client.getTargets(agentCtx.ctx, agentCtx.guid, checkType, callback);
}
helper = new AgentHelper();
helper.setAgentId(agentId);
helper.run(ctx, iter, callback);
},
function parseResponse(response, info, callback) {
if (!response.targets) {
callback(new errors.CheckDoesNotSupportTargets(checkType));
return;
}
callback(null, response.targets.map(function(id) {
return {
id: id,
label: id, // TODO: improve these labels
getSerializerType: function() {
return 'agent_check_targets';
}
};
}));
}
], callback);
};
/** Is this an agent check type.
* @param {String} type The check type.
* @return {Boolean} true/false.
*/
exports.isAgentCheckType = function(type) {
return type.indexOf('agent.') === 0 && CHECK_TYPES[type];
};
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment