Created
May 7, 2017 23:26
-
-
Save fforres/49682b77dddb63ef08784f06129b0a68 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
'use strict'; | |
console.log('In Function...'); | |
var AWS = require("aws-sdk"); | |
var uuid = require("uuid"); | |
var sqs = new AWS.SQS();; | |
var QueueUrl = "https://sqs.us-west-2.amazonaws.com/999999/cola.fifo"; | |
exports.main = (event, context, callback) => { | |
console.log("In main function..."); | |
const arr = []; | |
let messages = null; | |
let result_parseMessage = null; | |
let codigosNoProcesados = null; | |
receiveMessage(1) | |
.then(function (messages) { | |
messages = messages; | |
return parseMessage(messages); | |
}) | |
.then(function (result_parseMessage) { | |
result_parseMessage = result_parseMessage; | |
return splitArray(result_parseMessage.Codes, 1000); | |
}) | |
.then(function (res_splitArray) { | |
return sendCodes(messages, res_splitArray, result_parseMessage); | |
}) | |
.then(function (codigosNoProcesados) { | |
codigosNoProcesados = codigosNoProcesados; | |
return deleteMessage(result_parseMessage.ReceiptHandle) | |
}) | |
.then(function (codigosNoProcesados) { | |
if (codigosNoProcesados.length > 0) { | |
console.log("\t Códigos No Procesados : "+codigosNoProcesados.length); | |
return updateMessage(codigosNoProcesados, result_parseMessage); | |
} else { | |
return deleteMessage(result_parseMessage.ReceiptHandle); | |
} | |
}) | |
.then(function() { | |
console.log('FINISHED') | |
}) | |
.catch(function(error) { | |
console.log(error); | |
//logear errores en otro lado, tirarlos a splunk o q se sho | |
}) | |
} | |
function receiveMessage(numero){ | |
return new Promise(function(resolve, reject) { | |
console.log("In function receiveMessage..."); | |
var params = { | |
QueueUrl: QueueUrl, | |
MaxNumberOfMessages: numero, | |
AttributeNames: [ | |
"All" | |
] | |
}; | |
sqs.receiveMessage(params, function(err, messages) { | |
console.log("\t Recibiendo Mensaje..."); | |
if(messages.Messages){ | |
if (err) { | |
console.log(err, err.stack); // an error occurred | |
reject(err); | |
} | |
else{ | |
console.log(messages.Messages[0].Body.length); // successful response | |
resolve(messages) | |
} | |
}else{ | |
console.log("\t No Hay Mensajes " + QueueUrl); | |
reject({"error": "NO hay mensajes"}) | |
} | |
}); | |
}) | |
} | |
function parseMessage(messages){ | |
return new Promise(function(resolve, reject) { | |
console.log("In funtion parseMessage..."); | |
if(messages.Messages){ | |
var body = JSON.parse(messages.Messages[0].Body); | |
var params = {}; | |
params.MessageId = messages.Messages[0].MessageId; | |
params.MessageDeduplicationId = messages.Messages[0].Attributes.MessageDeduplicationId; | |
params.MessageGroupId = messages.Messages[0].Attributes.MessageGroupId; | |
params.Data = body.msg.data; | |
params.Codes = body.msg.code; | |
params.ReceiptHandle = messages.Messages[0].ReceiptHandle; | |
if(params.length<6){ | |
reject({"error": "Parametros insuficientes en function parseMessage"}) | |
}else{ | |
resolve(params); | |
} | |
}else{ | |
reject({"error": "NO hay mensajes"}) | |
} | |
}) | |
} | |
function sendCodes(messages,codes,params){ | |
console.log("In function sendCodes..."); | |
let codigosNoProcesados = []; | |
var cantidad = codes.length; | |
var strBody = { | |
"msg" : { | |
"data" : params.Data | |
} | |
}; | |
var arr = []; | |
var lambda = new AWS.Lambda(); | |
for (var i = 0; i < cantidad; i++) { | |
const index = i; | |
const lambdapromisificado = new Promise(function(resolve, reject) { | |
strBody.msg.code = codes[i]; | |
messages.Messages[0].Body = JSON.stringify(strBody); | |
var params = { | |
FunctionName: "test_sqs_worker", | |
Payload: JSON.stringify(messages), | |
}; | |
lambda.invoke(params, function(err, data) { | |
var msj = JSON.parse(params.Payload) | |
console.log("\t In Lambda Invoke..."+params.FunctionName+" "+index); | |
if (err) { | |
console.log(err, err.stack); // an error occurred | |
}else{ | |
var retorno = JSON.parse(data.Payload); | |
if(typeof(retorno.body.codes) !== "undefined"){ | |
console.log("\t Items no procesados..."+params.FunctionName+" "+index); | |
console.log("\t No Procesados "+retorno.body.codes.length); | |
codigosNoProcesados = codigosNoProcesados.concat(retorno.body.codes); | |
}else{ | |
console.log("\t Todos los items procesados para..."+params.FunctionName+" "+index); | |
} | |
} | |
resolve(); | |
});//LambdaInvoke | |
});//promise | |
arr.push(lambdapromisificado); | |
}//for | |
return Promise.all(arr).then(function() { | |
return Promise.resolve(codigosNoProcesados); | |
}); | |
} | |
function deleteMessage(ReceiptHandle,callback){ | |
return new Promise(function(resolve, reject) { | |
console.log("In function deleteMessage..."); | |
var params = { | |
QueueUrl: QueueUrl, | |
ReceiptHandle: ReceiptHandle | |
}; | |
sqs.deleteMessage(params, function(err, data) { | |
console.log("\t Mensaje Eliminado"); | |
console.log("\t SQS PROCESADO y ELIMINADO"); | |
if (err) { | |
console.log(err, err.stack); // an error occurred | |
reject(err); | |
}else{ | |
resolve(data); // successful response | |
} | |
}); | |
}) | |
} | |
function splitArray(codigos,block,callback){ | |
return new Promise(function(resolve, reject) { | |
console.log('In function SplitArray...'); | |
var total_code=codigos.length; | |
var flag = Math.trunc(codigos.length / block); | |
var remaind = codigos.length % block; | |
var control = (remaind === 0) ? flag : flag+1; | |
var array =[]; | |
var contador = 0; | |
for (var i = 0; i < control; i++) { | |
array.push((codigos.slice(contador,contador+block))); | |
contador+= block; | |
} | |
if(array.length==0){ | |
reject({'error':'Error, Se recibio un array con '+total_code+' elementos.'}); | |
}else{ | |
console.log('\t Total packages '+array.length); | |
resolve(array); | |
} | |
}) | |
} | |
function updateMessage(codes,data){ | |
return new Promise(function(resolve, reject) { | |
console.log("In function updateMessage..."); | |
var id = uuid.v4(); | |
var params = { | |
MessageBody: JSON.stringify({"msg":{'data':data.Data,'code':codes}}), | |
QueueUrl: QueueUrl, | |
MessageAttributes: { | |
someKey: { DataType: 'String', StringValue: "string"} | |
}, | |
MessageDeduplicationId: id, | |
MessageGroupId: data.MessageGroupId | |
}; | |
sqs.sendMessage(params, function(err, res) { | |
if (err) { | |
console.error("Unable to add item. Error JSON:", JSON.stringify(err, null, 2)); | |
reject(err); | |
}else{ | |
console.log("\t Enviando No procesados a SQS"); | |
console.log("\t SQS PROCESADO PARCIAL") | |
resolve(null); | |
} | |
}); | |
}) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment