Skip to content

Instantly share code, notes, and snippets.

@almost
Last active December 22, 2015 11:40
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
Star You must be signed in to star a gist
Save almost/cd046427ce9203fbe3f3 to your computer and use it in GitHub Desktop.
Example of using through2-concurrent with sqs-readable-stream
var AWS = require('aws-sdk');
var SQSReadableStream = require('sqs-readable-stream');
var through2Concurrent = require('through2-concurrent');
var sqs = new AWS.SQS({
apiVersion: '2012-11-05',
region: 'us-east-1',
accessKeyId: 'YOUR AMAZON ACCESS KEY'
});
var sqsStream = new SQSReadableStream({
sqsClient: sqs,
queueUrl: queueUrl
});
sqsStream
.pipe(through2Concurrent.obj({maxConcurrency: 10}, function (message, _enc, callback) {
var self = this;
console.log('New message:', message.Body);
message.deleteMessage(function (err) {
if (err) {
console.log('Failed to delete message');
} else {
console.log('Deleted message');
self.push(message);
}
callback(err);
});
}))
.on('data', function (message) {
console.log('Message that has already been deleted:', message);
});
sqsStream.on('error', function (error) {
console.log('Error receiving messages:', error);
});
@frontierpsycho
Copy link

Some corrections to make it more copy-pastable:

var sqsStream = new SQSReadableStream({
  sqsClient: sqs,
  queueUrl: queueUrl
});
.pipe(through2Concurrent.obj({maxConcurrency: 10}, function (message, _enc, callback) {

(note .obj)

@frontierpsycho
Copy link

And one more thing: if the created transform stream emits an error (on line 26, callback(err)), then node unpipes it from sqsStream. See here:

http://www.bennadel.com/blog/2679-how-error-events-affect-piped-streams-in-node-js.htm

@almost
Copy link
Author

almost commented Dec 22, 2015

Thanks @frontierpsycho, I've updated it with your fixes

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment