Created
March 10, 2020 20:47
-
-
Save kevboutin/cff7c6106a2332cb1a4d4e0ad1fcf641 to your computer and use it in GitHub Desktop.
Processing messages from a queue using Azure Service Bus
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const { ServiceBusClient } = require('@azure/service-bus'); | |
const moment = require('moment'); | |
const connectionString = process.env.SB_CONNECTION; | |
const queueName = process.env.SB_QUEUE_NAME; | |
/** | |
* Processing messages from service bus queue. | |
* | |
* @param {object} context The context. | |
* @param {object} entry The service bus queue object. | |
* @returns {Promise<void>} A promise. | |
*/ | |
const processMessagesHandler = async (context, entry) => { | |
context.log(`Processing messages from queue.`, entry); | |
const sbClient = ServiceBusClient.createFromConnectionString(connectionString); | |
// If sending to a Topic, use `createTopicClient` instead of `createQueueClient` | |
const queueClient = sbClient.createQueueClient(queueName); | |
const receiver = queueClient.createReceiver(1); | |
// const receiver = queueClient.createReceiver(ReceiveMode.receiveAndDelete); | |
try { | |
for (let i = 0; i < 100; i++) { | |
const messages = await receiver.receiveMessages(1, 5); | |
if (!messages.length) { | |
context.log('No more messages to receive.'); | |
break; | |
} | |
context.log('Message:', messages[0].body); | |
const now = moment(); | |
const diff = now.diff(messages[0].body.time); | |
context.log(`[${diff}] Received message #${i} at ${now}: ${messages[0].body.id} - ${messages[0].body.fullName}`); | |
await messages[0].complete(); | |
} | |
await queueClient.close(); | |
} catch (error) { | |
context.log(`Caught an error trying to receive messages or closing the queue.`, error); | |
} finally { | |
await sbClient.close(); | |
} | |
}; | |
module.exports = processMessagesHandler; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment