Skip to content

Instantly share code, notes, and snippets.

@lehno
Last active October 26, 2019 20:28
Show Gist options
  • Save lehno/947dafda091e4704acd6b262c99fceeb to your computer and use it in GitHub Desktop.
Save lehno/947dafda091e4704acd6b262c99fceeb to your computer and use it in GitHub Desktop.
SQS Mixin
'use strict';
// Use this produce and consume jobs from AWS SQS as a normal queue list.
// Just add as a mixin and listen to your queues
const { Consumer } = require('sqs-consumer');
const Producer = require('sqs-producer');
const _ = require('lodash');
const AWS = require('aws-sdk');
module.exports = function (url) {
return {
settings: {
url: url || 'https://sqs.eu-west-1.amazonaws.com/<USERID>'
},
methods: {
getSQSQueueConsumer (name, fn) {
if (!this.$consumers[name]) {
try {
let consumer = Consumer.create({
queueUrl: `${this.settings.url}/${name}`,
sqs: new AWS.SQS(),
handleMessage: fn.bind(this)
});
consumer.on('error', (err) => {
this.logger.error(err);
});
consumer.on('processing_error', (err) => {
this.logger.error(err);
});
consumer.on('timeout_error', (err) => {
this.logger.error(err);
});
this.$consumers[name] = consumer;
} catch (err) {
this.logger.error(err);
return this.Promise.reject('Unable to start queue');
}
}
return this.$consumers[name];
},
getSQSQueueProducer (name) {
if (!this.$producers[name]) {
this.$producers[name] = Producer.create({
queueUrl: `${this.settings.url}/${name}`,
sqs: new AWS.SQS()
});
}
return this.$producers[name];
},
async addSQSJob (name, message) {
let queue = this.getSQSQueueProducer(name);
queue.send(JSON.stringify(message), function (err) {
if (err) this.logger.error(err);
});
},
},
created () {
AWS.config.loadFromPath('./config/env/aws-development.json');
this.$consumers = {};
this.$producers = {};
},
started () {
if (!this.settings.url) {
return this.Promise.reject('Missing options URL');
}
try {
if (this.schema.SQSQueues) {
_.forIn(this.schema.SQSQueues, (fn, name) => {
let channel = this.getSQSQueueConsumer(name, fn);
channel.start();
});
}
return this.Promise.resolve();
} catch (err) {
this.logger.error(err);
return this.Promise.reject('Unable to connect to AMQP');
}
}
};
};
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment