Last active
April 4, 2016 09:12
-
-
Save jhecking/3aeb47ed6adba291f9039ce7c92b04cc to your computer and use it in GitHub Desktop.
Example of Aerospike aggregation query with Lua aggregation function using Aerospike Node.js client
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const Aerospike = require('aerospike') | |
const config = { | |
hosts: '192.168.33.10:3000', | |
log: { level: 5 }, | |
modlua: { | |
userPath: './', | |
systemPath: './node_modules/aerospike/aerospike-client-c/lua/' | |
} | |
} | |
Aerospike.connect(config, (error, client) => { | |
if (error) throw error | |
var statement = { | |
aggregationUDF: { | |
module: 'agg', | |
funcname: 'check_teamId', | |
args: ['123', 0, 1456499994597] | |
} | |
} | |
var query = client.query('test', 'tracking', statement) | |
var stream = query.execute() | |
var count = 0 | |
stream.on('error', (error) => console.error('error:', error)) | |
stream.on('data', (result) => { | |
count++ | |
console.log('result:', result) | |
}) | |
stream.on('end', () => { | |
console.log('found %d records', count) | |
client.close() | |
}) | |
}) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
local function map_profile(record) | |
return map {interaction=record.interaction, | |
teamId=record.teamId, datetime=record.datetime, | |
timestamp=record.timestamp, version=record.version, | |
interactions=record.interactions} | |
end | |
function check_teamId(stream, teamId, startDate, endDate) | |
local function filter_teamId(record) | |
return record.teamId == teamId and | |
record.timestamp >= startDate and record.timestamp <= endDate | |
end | |
return stream : filter(filter_teamId) : map(map_profile) | |
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const Aerospike = require('aerospike') | |
const Key = Aerospike.Key | |
const ns = 'test' | |
const set = 'tracking' | |
const indexBin = 'teamId' | |
const indexName = 'teamIdx' | |
const teamIDs = ['123', 'abc'] | |
const luaScript = 'agg.lua' | |
const timestampRange = [1456399994597, 1456599994597] | |
const noRecords = 100 | |
const maxConcurent = 10 | |
var tasks = ['create_index', 'register_udf', 'create_sample_records'] | |
// Track which tasks have been completed and call callback function if all | |
// tasks are done. | |
function taskDone (task, cb) { | |
var idx = tasks.indexOf(task) | |
if (idx >= 0) { | |
tasks.splice(idx, 1) | |
if (tasks.length === 0) { | |
cb() | |
} | |
} else { | |
console.error('error: unknown task: %s', task) | |
} | |
} | |
// Returns a random integer between min (included) and max (excluded). | |
function getRandomInt (min, max) { | |
return Math.floor(Math.random() * (max - min)) + min | |
} | |
// Create a single sample record | |
// | |
// Parameters: | |
// client - Client instance | |
// idx - Number of current record to be created | |
// callback - Function to call when the record was created | |
function createSampleRecord (client, idx, callback) { | |
var key = new Key(ns, set, 'sample' + idx) | |
var teamID = teamIDs[getRandomInt(0, teamIDs.length)] | |
var timestamp = getRandomInt(timestampRange[0], timestampRange[1]) | |
var datetime = new Date(timestamp).toISOString() | |
var record = { | |
teamId: teamID, | |
timestamp: timestamp, | |
datetime: datetime, | |
version: 1, | |
interaction: getRandomInt(0, 100), | |
interactions: getRandomInt(0, 100) | |
} | |
var meta = {exists: Aerospike.policy.exists.CREATE_OR_REPLACE} | |
client.put(key, record, meta, callback) | |
} | |
// Create a number of sample records in parallel | |
// | |
// Parameters: | |
// client - Client instance | |
// recordsToCreate - Number of records to create in total | |
// maxConcurrent - Max. number of concurrent client commands to issue | |
// done - Function to call once last record was created | |
function createSampleRecords (client, recordsToCreate, maxConcurrent, done) { | |
var recordIdx = 0 | |
var inFlight = 0 | |
var createCb = function (err, key) { | |
if (err) { | |
console.error('error creating sample record: %s', err) | |
process.exit(-1) | |
} | |
if (key !== null) { | |
// console.info('sample record created: %s', key.key) | |
inFlight-- | |
} | |
recordIdx++ | |
if (recordIdx <= recordsToCreate && inFlight < maxConcurrent) { | |
createSampleRecord(client, recordIdx, createCb) | |
inFlight++ | |
} else if (recordIdx > recordsToCreate && inFlight === 0) { | |
done(recordsToCreate) | |
} | |
} | |
for (var i = 0; i < maxConcurrent; i++) { | |
createCb(null, null) | |
} | |
} | |
// Create a secondary index | |
// | |
// Parameters: | |
// client - Client instance | |
// ns - Namespace | |
// set - Set | |
// indexBin - Record bin to index | |
// indexName - Index name | |
// cb - Function to call when index is created | |
function createIndex (client, ns, set, indexBin, indexName, cb) { | |
var options = { | |
ns: ns, | |
set: set, | |
bin: indexBin, | |
index: indexName | |
} | |
client.createStringIndex(options, (err) => { | |
if (err) { | |
console.error('failed to create secondary index %s on %s: %s', indexName, indexBin, err) | |
process.exit(-1) | |
} | |
client.indexCreateWait(ns, indexName, 100, (err) => { | |
if (err) { | |
console.error('failed to create secondary index %s on %s: %s', indexName, indexBin, err) | |
process.exit(-1) | |
} | |
cb() | |
}) | |
}) | |
} | |
// Register a LUA script | |
// | |
// Parameters: | |
// client - Client instance | |
// luaScript - File path of the LUA script | |
// cb - Function to call when the script is registered | |
function registerUDF (client, luaScript, cb) { | |
client.udfRegister(luaScript, (err) => { | |
if (err) { | |
console.error('failed to register lua module "%s": %s', luaScript, err) | |
process.exit(-1) | |
} | |
client.udfRegisterWait(luaScript, 100, (err) => { | |
if (err) { | |
console.error('failed to register lua module "%s": %s', luaScript, err) | |
process.exit(-1) | |
} | |
cb() | |
}) | |
}) | |
} | |
Aerospike.connect((err, client) => { | |
if (err) { | |
console.error('failed to connect to cluster: %s', err) | |
process.exit(-1) | |
} | |
var done = function () { | |
console.log('all done!') | |
client.close() | |
} | |
createIndex(client, ns, set, indexBin, indexName, () => { | |
console.info('secondary index "%s" on "%s" created', indexName, indexBin) | |
taskDone('create_index', done) | |
}) | |
registerUDF(client, luaScript, () => { | |
console.log('UDF module "%s" registered successfully', luaScript) | |
taskDone('register_udf', done) | |
}) | |
createSampleRecords(client, noRecords, maxConcurent, (created) => { | |
console.info('%d sample records created successfully', created) | |
taskDone('create_sample_records', done) | |
}) | |
}) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment