-
-
Save didil/8040e2b6dc4f4daa6e4bdbd1755df187 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const path = require('path'); | |
require('dotenv').config({path: path.resolve(process.cwd(), '../.env')}); | |
const express = require('express'); | |
const app = express(); | |
const http = require('http'); | |
const bodyParser = require('body-parser'); | |
const amqp = require('amqplib'); | |
// Middleware | |
app.use(bodyParser.json()); | |
// simulate request ids | |
let lastRequestId = 1; | |
// RabbitMQ connection string | |
const messageQueueConnectionString = process.env.CLOUDAMQP_URL; | |
// handle the request | |
app.post('/api/v1/processData', async function (req, res) { | |
// save request id and increment | |
let requestId = lastRequestId; | |
lastRequestId++; | |
// connect to Rabbit MQ and create a channel | |
let connection = await amqp.connect(messageQueueConnectionString); | |
let channel = await connection.createConfirmChannel(); | |
// publish the data to Rabbit MQ | |
let requestData = req.body.data; | |
console.log("Published a request message, requestId:", requestId); | |
await publishToChannel(channel, { routingKey: "request", exchangeName: "processing", data: { requestId, requestData } }); | |
// send the request id in the response | |
res.send({ requestId }) | |
}); | |
// utility function to publish messages to a channel | |
function publishToChannel(channel, { routingKey, exchangeName, data }) { | |
return new Promise((resolve, reject) => { | |
channel.publish(exchangeName, routingKey, Buffer.from(JSON.stringify(data), 'utf-8'), { persistent: true }, function (err, ok) { | |
if (err) { | |
return reject(err); | |
} | |
resolve(); | |
}) | |
}); | |
} | |
async function listenForResults() { | |
// connect to Rabbit MQ | |
let connection = await amqp.connect(messageQueueConnectionString); | |
// create a channel and prefetch 1 message at a time | |
let channel = await connection.createChannel(); | |
await channel.prefetch(1); | |
// start consuming messages | |
await consume({ connection, channel }); | |
} | |
// consume messages from RabbitMQ | |
function consume({ connection, channel, resultsChannel }) { | |
return new Promise((resolve, reject) => { | |
channel.consume("processing.results", async function (msg) { | |
// parse message | |
let msgBody = msg.content.toString(); | |
let data = JSON.parse(msgBody); | |
let requestId = data.requestId; | |
let processingResults = data.processingResults; | |
console.log("Received a result message, requestId:", requestId, "processingResults:", processingResults); | |
// acknowledge message as received | |
await channel.ack(msg); | |
}); | |
// handle connection closed | |
connection.on("close", (err) => { | |
return reject(err); | |
}); | |
// handle errors | |
connection.on("error", (err) => { | |
return reject(err); | |
}); | |
}); | |
} | |
// Start the server | |
const PORT = 3000; | |
server = http.createServer(app); | |
server.listen(PORT, "localhost", function (err) { | |
if (err) { | |
console.error(err); | |
} else { | |
console.info("Listening on port %s.", PORT); | |
} | |
}); | |
// listen for results on RabbitMQ | |
listenForResults(); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment