Skip to content

Instantly share code, notes, and snippets.

@jhecking
Last active April 4, 2016 09:12
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jhecking/3aeb47ed6adba291f9039ce7c92b04cc to your computer and use it in GitHub Desktop.
Save jhecking/3aeb47ed6adba291f9039ce7c92b04cc to your computer and use it in GitHub Desktop.
Example of Aerospike aggregation query with Lua aggregation function using Aerospike Node.js client
const Aerospike = require('aerospike')
const config = {
hosts: '192.168.33.10:3000',
log: { level: 5 },
modlua: {
userPath: './',
systemPath: './node_modules/aerospike/aerospike-client-c/lua/'
}
}
Aerospike.connect(config, (error, client) => {
if (error) throw error
var statement = {
aggregationUDF: {
module: 'agg',
funcname: 'check_teamId',
args: ['123', 0, 1456499994597]
}
}
var query = client.query('test', 'tracking', statement)
var stream = query.execute()
var count = 0
stream.on('error', (error) => console.error('error:', error))
stream.on('data', (result) => {
count++
console.log('result:', result)
})
stream.on('end', () => {
console.log('found %d records', count)
client.close()
})
})
local function map_profile(record)
return map {interaction=record.interaction,
teamId=record.teamId, datetime=record.datetime,
timestamp=record.timestamp, version=record.version,
interactions=record.interactions}
end
function check_teamId(stream, teamId, startDate, endDate)
local function filter_teamId(record)
return record.teamId == teamId and
record.timestamp >= startDate and record.timestamp <= endDate
end
return stream : filter(filter_teamId) : map(map_profile)
end
const Aerospike = require('aerospike')
const Key = Aerospike.Key
const ns = 'test'
const set = 'tracking'
const indexBin = 'teamId'
const indexName = 'teamIdx'
const teamIDs = ['123', 'abc']
const luaScript = 'agg.lua'
const timestampRange = [1456399994597, 1456599994597]
const noRecords = 100
const maxConcurent = 10
var tasks = ['create_index', 'register_udf', 'create_sample_records']
// Track which tasks have been completed and call callback function if all
// tasks are done.
function taskDone (task, cb) {
var idx = tasks.indexOf(task)
if (idx >= 0) {
tasks.splice(idx, 1)
if (tasks.length === 0) {
cb()
}
} else {
console.error('error: unknown task: %s', task)
}
}
// Returns a random integer between min (included) and max (excluded).
function getRandomInt (min, max) {
return Math.floor(Math.random() * (max - min)) + min
}
// Create a single sample record
//
// Parameters:
// client - Client instance
// idx - Number of current record to be created
// callback - Function to call when the record was created
function createSampleRecord (client, idx, callback) {
var key = new Key(ns, set, 'sample' + idx)
var teamID = teamIDs[getRandomInt(0, teamIDs.length)]
var timestamp = getRandomInt(timestampRange[0], timestampRange[1])
var datetime = new Date(timestamp).toISOString()
var record = {
teamId: teamID,
timestamp: timestamp,
datetime: datetime,
version: 1,
interaction: getRandomInt(0, 100),
interactions: getRandomInt(0, 100)
}
var meta = {exists: Aerospike.policy.exists.CREATE_OR_REPLACE}
client.put(key, record, meta, callback)
}
// Create a number of sample records in parallel
//
// Parameters:
// client - Client instance
// recordsToCreate - Number of records to create in total
// maxConcurrent - Max. number of concurrent client commands to issue
// done - Function to call once last record was created
function createSampleRecords (client, recordsToCreate, maxConcurrent, done) {
var recordIdx = 0
var inFlight = 0
var createCb = function (err, key) {
if (err) {
console.error('error creating sample record: %s', err)
process.exit(-1)
}
if (key !== null) {
// console.info('sample record created: %s', key.key)
inFlight--
}
recordIdx++
if (recordIdx <= recordsToCreate && inFlight < maxConcurrent) {
createSampleRecord(client, recordIdx, createCb)
inFlight++
} else if (recordIdx > recordsToCreate && inFlight === 0) {
done(recordsToCreate)
}
}
for (var i = 0; i < maxConcurrent; i++) {
createCb(null, null)
}
}
// Create a secondary index
//
// Parameters:
// client - Client instance
// ns - Namespace
// set - Set
// indexBin - Record bin to index
// indexName - Index name
// cb - Function to call when index is created
function createIndex (client, ns, set, indexBin, indexName, cb) {
var options = {
ns: ns,
set: set,
bin: indexBin,
index: indexName
}
client.createStringIndex(options, (err) => {
if (err) {
console.error('failed to create secondary index %s on %s: %s', indexName, indexBin, err)
process.exit(-1)
}
client.indexCreateWait(ns, indexName, 100, (err) => {
if (err) {
console.error('failed to create secondary index %s on %s: %s', indexName, indexBin, err)
process.exit(-1)
}
cb()
})
})
}
// Register a LUA script
//
// Parameters:
// client - Client instance
// luaScript - File path of the LUA script
// cb - Function to call when the script is registered
function registerUDF (client, luaScript, cb) {
client.udfRegister(luaScript, (err) => {
if (err) {
console.error('failed to register lua module "%s": %s', luaScript, err)
process.exit(-1)
}
client.udfRegisterWait(luaScript, 100, (err) => {
if (err) {
console.error('failed to register lua module "%s": %s', luaScript, err)
process.exit(-1)
}
cb()
})
})
}
Aerospike.connect((err, client) => {
if (err) {
console.error('failed to connect to cluster: %s', err)
process.exit(-1)
}
var done = function () {
console.log('all done!')
client.close()
}
createIndex(client, ns, set, indexBin, indexName, () => {
console.info('secondary index "%s" on "%s" created', indexName, indexBin)
taskDone('create_index', done)
})
registerUDF(client, luaScript, () => {
console.log('UDF module "%s" registered successfully', luaScript)
taskDone('register_udf', done)
})
createSampleRecords(client, noRecords, maxConcurent, (created) => {
console.info('%d sample records created successfully', created)
taskDone('create_sample_records', done)
})
})
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment