Skip to content

Instantly share code, notes, and snippets.

@fforres
Created May 7, 2017 23:26
Show Gist options
  • Save fforres/49682b77dddb63ef08784f06129b0a68 to your computer and use it in GitHub Desktop.
Save fforres/49682b77dddb63ef08784f06129b0a68 to your computer and use it in GitHub Desktop.
'use strict';
console.log('In Function...');
var AWS = require("aws-sdk");
var uuid = require("uuid");
var sqs = new AWS.SQS();;
var QueueUrl = "https://sqs.us-west-2.amazonaws.com/999999/cola.fifo";
exports.main = (event, context, callback) => {
console.log("In main function...");
const arr = [];
let messages = null;
let result_parseMessage = null;
let codigosNoProcesados = null;
receiveMessage(1)
.then(function (messages) {
messages = messages;
return parseMessage(messages);
})
.then(function (result_parseMessage) {
result_parseMessage = result_parseMessage;
return splitArray(result_parseMessage.Codes, 1000);
})
.then(function (res_splitArray) {
return sendCodes(messages, res_splitArray, result_parseMessage);
})
.then(function (codigosNoProcesados) {
codigosNoProcesados = codigosNoProcesados;
return deleteMessage(result_parseMessage.ReceiptHandle)
})
.then(function (codigosNoProcesados) {
if (codigosNoProcesados.length > 0) {
console.log("\t Códigos No Procesados : "+codigosNoProcesados.length);
return updateMessage(codigosNoProcesados, result_parseMessage);
} else {
return deleteMessage(result_parseMessage.ReceiptHandle);
}
})
.then(function() {
console.log('FINISHED')
})
.catch(function(error) {
console.log(error);
//logear errores en otro lado, tirarlos a splunk o q se sho
})
}
function receiveMessage(numero){
return new Promise(function(resolve, reject) {
console.log("In function receiveMessage...");
var params = {
QueueUrl: QueueUrl,
MaxNumberOfMessages: numero,
AttributeNames: [
"All"
]
};
sqs.receiveMessage(params, function(err, messages) {
console.log("\t Recibiendo Mensaje...");
if(messages.Messages){
if (err) {
console.log(err, err.stack); // an error occurred
reject(err);
}
else{
console.log(messages.Messages[0].Body.length); // successful response
resolve(messages)
}
}else{
console.log("\t No Hay Mensajes " + QueueUrl);
reject({"error": "NO hay mensajes"})
}
});
})
}
function parseMessage(messages){
return new Promise(function(resolve, reject) {
console.log("In funtion parseMessage...");
if(messages.Messages){
var body = JSON.parse(messages.Messages[0].Body);
var params = {};
params.MessageId = messages.Messages[0].MessageId;
params.MessageDeduplicationId = messages.Messages[0].Attributes.MessageDeduplicationId;
params.MessageGroupId = messages.Messages[0].Attributes.MessageGroupId;
params.Data = body.msg.data;
params.Codes = body.msg.code;
params.ReceiptHandle = messages.Messages[0].ReceiptHandle;
if(params.length<6){
reject({"error": "Parametros insuficientes en function parseMessage"})
}else{
resolve(params);
}
}else{
reject({"error": "NO hay mensajes"})
}
})
}
function sendCodes(messages,codes,params){
console.log("In function sendCodes...");
let codigosNoProcesados = [];
var cantidad = codes.length;
var strBody = {
"msg" : {
"data" : params.Data
}
};
var arr = [];
var lambda = new AWS.Lambda();
for (var i = 0; i < cantidad; i++) {
const index = i;
const lambdapromisificado = new Promise(function(resolve, reject) {
strBody.msg.code = codes[i];
messages.Messages[0].Body = JSON.stringify(strBody);
var params = {
FunctionName: "test_sqs_worker",
Payload: JSON.stringify(messages),
};
lambda.invoke(params, function(err, data) {
var msj = JSON.parse(params.Payload)
console.log("\t In Lambda Invoke..."+params.FunctionName+" "+index);
if (err) {
console.log(err, err.stack); // an error occurred
}else{
var retorno = JSON.parse(data.Payload);
if(typeof(retorno.body.codes) !== "undefined"){
console.log("\t Items no procesados..."+params.FunctionName+" "+index);
console.log("\t No Procesados "+retorno.body.codes.length);
codigosNoProcesados = codigosNoProcesados.concat(retorno.body.codes);
}else{
console.log("\t Todos los items procesados para..."+params.FunctionName+" "+index);
}
}
resolve();
});//LambdaInvoke
});//promise
arr.push(lambdapromisificado);
}//for
return Promise.all(arr).then(function() {
return Promise.resolve(codigosNoProcesados);
});
}
function deleteMessage(ReceiptHandle,callback){
return new Promise(function(resolve, reject) {
console.log("In function deleteMessage...");
var params = {
QueueUrl: QueueUrl,
ReceiptHandle: ReceiptHandle
};
sqs.deleteMessage(params, function(err, data) {
console.log("\t Mensaje Eliminado");
console.log("\t SQS PROCESADO y ELIMINADO");
if (err) {
console.log(err, err.stack); // an error occurred
reject(err);
}else{
resolve(data); // successful response
}
});
})
}
function splitArray(codigos,block,callback){
return new Promise(function(resolve, reject) {
console.log('In function SplitArray...');
var total_code=codigos.length;
var flag = Math.trunc(codigos.length / block);
var remaind = codigos.length % block;
var control = (remaind === 0) ? flag : flag+1;
var array =[];
var contador = 0;
for (var i = 0; i < control; i++) {
array.push((codigos.slice(contador,contador+block)));
contador+= block;
}
if(array.length==0){
reject({'error':'Error, Se recibio un array con '+total_code+' elementos.'});
}else{
console.log('\t Total packages '+array.length);
resolve(array);
}
})
}
function updateMessage(codes,data){
return new Promise(function(resolve, reject) {
console.log("In function updateMessage...");
var id = uuid.v4();
var params = {
MessageBody: JSON.stringify({"msg":{'data':data.Data,'code':codes}}),
QueueUrl: QueueUrl,
MessageAttributes: {
someKey: { DataType: 'String', StringValue: "string"}
},
MessageDeduplicationId: id,
MessageGroupId: data.MessageGroupId
};
sqs.sendMessage(params, function(err, res) {
if (err) {
console.error("Unable to add item. Error JSON:", JSON.stringify(err, null, 2));
reject(err);
}else{
console.log("\t Enviando No procesados a SQS");
console.log("\t SQS PROCESADO PARCIAL")
resolve(null);
}
});
})
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment