Skip to content

Instantly share code, notes, and snippets.

@cfuerst
Last active April 20, 2020 18:48
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save cfuerst/75382a7487e312c33c0f32a7283b77e7 to your computer and use it in GitHub Desktop.
Save cfuerst/75382a7487e312c33c0f32a7283b77e7 to your computer and use it in GitHub Desktop.
lambda-node-srcds-to-es
/**
* receives logs from srcds engine via config
* logaddress_delall_http (remove all listeners)
* logaddress_add_http "https://foo.bar" (add listener)
* tranform them to meaningful events and send them to elasticsearch
*/
//libs
const https = require('https');
const crypto = require('crypto');
//elasticsearch connection
const elasticsearchRqOptions = {
hostname: process.env.ES_URL,
path: '/_bulk',
port: 9243,
method: 'POST',
headers: {
'Authorization': 'ApiKey ' + process.env.ES_API_KEY,
'Content-Type': 'application/x-ndjson'
}
};
//easticsearch index name
const currentDate = new Date().toJSON().slice(0,10);
const elasticIndexName = 'srcds_logs_'+currentDate;
//identify log events by regex adapted by https://github.com/Nols1000/srcds-logs
const parser = {
"convar": / - "(.*)" = "(.*)"/,
"matchvar": / - server_cvar: "(.*)" "(.*)"/,
"cvarsStart": / - server cvars start/,
"cvarsEnd": / - server cvars end/,
"freeze": / - Starting Freeze period/,
"say": / - "(.*)<(\d*)><(.*)><(.*)>" say "(.*)"/,
"kill": / - "(.*)<(\d*)><(.*)><(.*)>" \[(.*)\] killed "(.*)<(\d*)><(.*)><(.*)>" \[(.*)\] with "(.*)"/,
"blinded": / - "(.*)<(\d*)><(.*)><(.*)>" blinded for ([0-9]*\.[0-9]+|[0-9]+) by "(.*)<(\d*)><(.*)><(.*)>" from flashbang(.*)/,
"attack": / - "(.*)<(\d*)><(.*)><(.*)>" \[(.*)\] attacked "(.*)<(\d*)><(.*)><(.*)>" \[(.*)\] with "(.*)" \(damage "(\d*)"\) (.*) \(hitgroup "(.*)"\)/,
"killH": / - "(.*)<(\d*)><(.*)><(.*)>" \[(.*)\] killed "(.*)<(\d*)><(.*)><(.*)>" \[(.*)\] with "(.*)" \((.*)\)/,
"killOther": / - "(.*)<(\d*)><(.*)><(.*)>" \[(.*)\] killed other "(.*)<(\d*)>" \[(.*)\] with "(.*)"/,
"killSuicide": / - "(.*)<(\d*)><(.*)><(.*)>" \[(.*)\] committed suicide with "(.*)"/,
"assist": / - "(.*)<(\d*)><(.*)><(.*)>" assisted killing "(.*)<(\d*)><(.*)><(.*)>"/,
"throwNade": / - "(.*)<(\d*)><(.*)><(.*)>" threw (.*) \[(.*) (.*) (.*)\]/,
"projectile":/ - (.*) projectile spawned at (.*) (.*) (.*), velocity (.*) (.*) (.*)/,
"purchase": / - "(.*)<(\d*)><(.*)><(.*)>" purchased "(.*)"/,
"leaveBuyZone": / - "(.*)<(\d*)><(.*)><(.*)>" left buyzone with \[(.*)\]/,
"player": / - "(.*)<(\d*)><(.*)><(.*)>" triggered "(.*)"/,
"playerTeamchange": / - "(.*)<(\d*)><(.*)>" switched from team <(.*)> to <(.*)>/,
"gameOver": / - Game Over: (.*) (\d+) (.*) score (\d+:\d+) after (\d+) min/,
"team": / - Team "(.*)" triggered "(.*)" \(CT "(\d*)"\) \(T "(\d*)"\)"/,
"teamName": / - Team playing "(.*)": (.*)/,
"accoladeFinal": / - ACCOLADE, FINAL: \{(.*)\},(.*)<(\d*)>,(.*),(.*),(.*)/,
"score": / - Team "(.*)" scored "(\d*)" with "(\d*)" players/,
"world": / - World triggered "(.*)"/,
"connect": / - "(.*)<(\d*)><(.*)><(.*)>" connected, address "(.*)"/,
"entered": / - "(.*)<(\d*)><(.*)><(.*)>" entered the game/,
"validated": / - "(.*)<(\d*)><(.*)><(.*)>" STEAM USERID validated/,
"disconnect": / - "(.*)<(\d*)><(.*)><(.*)>" disconnected \((.*)\)/,
"mapLoading": / - Loading map "(.*)"/,
"mapStarted": / - Started map "(.*)" \(CRC "(.*)"\)/,
"playerInfo": / - "(.*)<(\d*)><(.*)><(.*)>" (\[.*\]) (.*)/,
"teamTrigger": / - Team "(.*)" triggered "(.*)" \(([C|T]+) "(\d+)"\) \(([CT]+) "(\d+)"\)/
};
// log timestamp parser
const parserTs = /(\d\d\/\d\d\/\d\d\d\d - \d\d:\d\d:\d\d\.\d\d\d)/;
//do not send those event types to the es cluster
const blacklistedEvents = ['convar', 'matchvar', 'validated', 'entered', 'connect', 'disconnect', 'projectile', 'cvarsStart', 'cvarsEnd', 'playerTeamchange', 'freeze'];
//add a function per event type/key from the regex patterns here to transform a line
const lineTransformersByKey = {
defaultTransformer: function(item, attributes, initiator, opponent) {
var event = {
'eventName': 'event_' + item.key,
'rawEvent': item.matches[0],
'timestamp': getEsTimestampFormat(item.matches[1])
};
if (attributes === Object(attributes)) {
event['eventProperties'+ucfirst(item.key)] = attributes;
}
if (initiator === Object(initiator)) {
event['initiator'] = initiator;
event.initiator.name_if_not_bot = initiator.id === 'BOT' ? 'BOT' : initiator.name;
}
if (opponent === Object(opponent)) {
event['opponent'] = opponent;
event.opponent.name_if_not_bot = opponent.id === 'BOT' ? 'BOT' : opponent.name;
}
if (initiator === Object(initiator) && opponent === Object(opponent)) {
event['self_inflicted'] = initiator.id == opponent.id && initiator.name == opponent.name;
event['opponent_eq_initiator_team'] = initiator.team == opponent.team;
}
return event;
},
kill: function(item) {
return this.defaultTransformer(item,
{
'weapon': item.matches[12],
'distance_meters': calcDistinace(item.matches[6], item.matches[11])
},
{
'id': item.matches[4],
'name': item.matches[2],
'team': item.matches[5]
},
{
'id': item.matches[9],
'name': item.matches[7],
'team': item.matches[10],
}
);
},
killOther: function(item) {
return this.defaultTransformer(item,
{
'weapon': item.matches[10],
'distance_meters': calcDistinace(item.matches[6], item.matches[9]),
'description': item.matches[7]
},
{
'id': item.matches[4],
'name': item.matches[2],
'team': item.matches[5]
}
);
},
killSuicide: function(item) {
return this.defaultTransformer(item,
{
'weapon': item.matches[7],
},
{
'id': item.matches[4],
'name': item.matches[2],
'team': item.matches[5]
}
);
},
attack: function(item) {
return this.defaultTransformer(item,
{
'weapon': item.matches[12],
'distance_meters': calcDistinace(item.matches[6], item.matches[11]),
'damage': parseInt(item.matches[13]),
'hitgroup': item.matches[15],
},
{
'id': item.matches[4],
'name': item.matches[2],
'team': item.matches[5]
},
{
'id': item.matches[9],
'name': item.matches[7],
'team': item.matches[10],
}
);
},
blinded: function(item) {
return this.defaultTransformer(item,
{
'weapon': 'flashbang',
'blinded_for_seconds': parseFloat(item.matches[6])
},
{
'id': item.matches[4],
'name': item.matches[2],
'team': item.matches[5]
},
{
'id': item.matches[9],
'name': item.matches[7],
'team': item.matches[10],
}
);
},
assist: function(item) {
return this.defaultTransformer(item, null,
{
'id': item.matches[4],
'name': item.matches[2],
'team': item.matches[5]
},
{
'id': item.matches[8],
'name': item.matches[6],
'team': item.matches[9],
}
);
},
throwNade: function(item) {
return this.defaultTransformer(item,
{
'weapon': item.matches[6]
},
{
'id': item.matches[4],
'name': item.matches[2],
'team': item.matches[5]
}
);
},
purchase: function(item) {
return this.defaultTransformer(item,
{
'weapon': item.matches[6]
},
{
'id': item.matches[4],
'name': item.matches[2],
'team': item.matches[5]
}
);
},
leaveBuyZone: function(item) {
let data = item.matches[6].trim(' ').split(' ');
return this.defaultTransformer(item,
{
'gear': data.length > 1 ? data : [],
'no_gear': data.length > 1 ? false : true
},
{
'id': item.matches[4],
'name': item.matches[2],
'team': item.matches[5]
}
);
},
player: function(item) {
return this.defaultTransformer(item,
{
'command': item.matches[6]
},
{
'id': item.matches[4],
'name': item.matches[2],
'team': item.matches[5]
}
);
},
playerInfo: function(item) {
return this.defaultTransformer(item,
{
'info': item.matches[7].replace('.','')
},
{
'id': item.matches[4],
'name': item.matches[2],
'team': item.matches[5]
}
);
},
world: function(item) {
let str = item.matches[2].split('" on "');
let data = {'command': str[0]};
if (str.length >= 2) {
data['detail'] = str[1];
}
return this.defaultTransformer(item, data);
},
teamTrigger: function(item) {
let data = { 'command':item.matches[3], 'triggered_by_team': item.matches[2] };
data['team_'+item.matches[4]+'_score'] = item.matches[5];
data['team_'+item.matches[6]+'_score'] = item.matches[7];
return this.defaultTransformer(item, data);
},
score: function(item) {
return this.defaultTransformer(item,
{
'team': item.matches[2],
'score': parseInt(item.matches[3]),
'num_players': parseInt(item.matches[4])
}
);
},
gameOver: function(item) {
return this.defaultTransformer(item,
{
'game_mode': item.matches[2],
'map_id': item.matches[3],
'map_name': item.matches[4],
'score': item.matches[5],
'score_left': item.matches[5].split(':')[0],
'score_right': item.matches[5].split(':')[1],
'minutes': item.matches[6].split(':')[1],
}
);
},
accoladeFinal: function(item) {
return this.defaultTransformer(item,
{
'badge': item.matches[2],
'player_name': item.matches[3],
'player_rank': parseInt(item.matches[4]),
'badge_value': parseFloat(item.matches[5].split(': ')[1]),
'badge_rank': parseFloat(item.matches[6].split(': ')[1]),
'score': parseFloat(item.matches[7].split(': ')[1]),
}
);
}
};
//main handler
exports.handler = async function(event, context, callback) {
//events which were transformed
let transformedEvents = [];
//bulk request body for es
let bulkRequestBody;
// if data was sent
if ('body' in event) {
//assume multilines per request (multi line logs will be sent in certain conditions)
const lines = await cleanRequestBodyAndGetLinesArray(event.body);
//array of transformed events for debug purpose
transformedEvents = await getTransformedEvents(lines);
//debug dont send to es
//return callback(null, {"StatusCode": 200, "events": transformedEvents});
//bulk body for elasticsearch
if (transformedEvents.length > 0) {
//get the full bulk body
bulkRequestBody = await getElasticBulkBody(transformedEvents);
//call elasticsearch to index the data
const response = sendEventsToElasticInBulk(bulkRequestBody);
// promise resolved or rejected asynchronously
response.then((response) => {
// log some summary
var data = JSON.parse(response);
if (data === Object(data) && 'items' in data && 'errors' in data) {
console.log('sucess: ', getBulkRequestSummary(data));
} else {
console.log('fail: unexpected response');
}
}).catch((error) => {
console.log(error);
});
}
}
return callback(null, {"StatusCode": 200});
}
//parse all lines and return the transformed events
function getTransformedEvents(lines) {
let events = [];
//go trough all log entries
lines.forEach(function(line) {
if (line.trim()) {
//try to get a regex match
let result = parseLineAndGetFirstMatch(line);
//if found a match and not blacklisted proceed
if (!blacklistedEvents.includes(result.key)) {
//get the line transformer and call it or default
let transformer = result.key in lineTransformersByKey && result.found ? result.key : 'defaultTransformer';
//get the event in the transformed format
let transformedEvent = lineTransformersByKey[transformer](result);
//do not send to ES if debug
events.push({'event':transformedEvent, 'raw':result});
}
}
});
return events;
}
//do some cleanup and split lines
function cleanRequestBodyAndGetLinesArray(data) {
return unescape(data.toString().replace(/\t/g, '')).split('\n');
}
//find matches against parser regex
function parseLineAndGetFirstMatch(line) {
for (const key in parser) {
const found = line.match(new RegExp(parserTs.source + parser[key].source));
if (found) {
return {'found': true, 'key': key, 'matches': found};
}
}
return {'found': false, 'key': 'unmapped', 'matches': [line, line.substring(0,25)]};
}
//calculate distance between 2 points on the map
function calcDistinace(kPos, vPos) {
const [x1, y1, z1] = kPos.split(' ');
const [x2, y2, z2] = vPos.split(' ');
const distance_units = Math.sqrt(
((x1-x2)**2) + ((y1-y2)**2) + ((z1-z2)**2)
);
const distance_meters = (distance_units * 2.540) / 100;
return Math.round(distance_meters * 100) / 100;
}
//upercase first letter
function ucfirst(str) {
return str.charAt(0).toUpperCase() + str.slice(1);
}
//convert srcds log timestmap to elastic format
function getEsTimestampFormat(str) {
const s1 = str.split(' - ');
const s2 = s1[0].split('/');
return s2[2]+'-'+s2[0]+'-'+s2[1]+"T"+s1[1]+"Z";
}
//get the request body for a bulk index request to elasticsearch
function getElasticBulkBody(items) {
let body = '';
items.forEach(function(item) {
let id = crypto.createHash('md5').update(item.event.rawEvent).digest('hex');
body += '{ "index" : { "_index" : "' + elasticIndexName + '", "_id" : "'+id+'" } }'+"\n";
body += JSON.stringify(item.event)+"\n";
});
return body;
}
//send the event to elasticsearch return promise
function sendEventsToElasticInBulk(bulkBody) {
// define the promise
let request = new Promise((resolve, reject) => {
const req = https.request(elasticsearchRqOptions, (response) => {
let chunks_of_data = [];
response.on('data', (fragments) => {
chunks_of_data.push(fragments);
});
response.on('end', () => {
let response_body = Buffer.concat(chunks_of_data);
resolve(response_body.toString());
});
response.on('error', (error) => {
reject(error);
});
});
req.write(bulkBody);
req.end();
});
return request;
}
//get a summary of what was bulked
function getBulkRequestSummary(data) {
let summary = {};
data.items.forEach(function(item) {
let key = Object.keys(item)[0];
let operation = item[key].result;
let success = item[key]._shards.total === item[key]._shards.successful ? 'success' : 'failed';
let summaryKey = key+'_'+operation+'_'+success;
if (!(summaryKey in summary)) {
summary[summaryKey] = 0;
}
summary[summaryKey]++;
});
return summary;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment