Skip to content

Instantly share code, notes, and snippets.

@amarzavery
Created October 1, 2018 21:28
Show Gist options
  • Save amarzavery/5d12b12ce54a1f0375ddb7a43519d233 to your computer and use it in GitHub Desktop.
Save amarzavery/5d12b12ce54a1f0375ddb7a43519d233 to your computer and use it in GitHub Desktop.
multiple amqp connections in the same process
import {
Connection, Receiver, EventContext, ConnectionOptions, ReceiverOptions, delay, ReceiverEvents, types
} from "../lib";
import * as dotenv from "dotenv"; // Optional for loading environment configuration from a .env (config) file
dotenv.config();
const host = process.env.AMQP_HOST || "host";
const username = process.env.AMQP_USERNAME || "sharedAccessKeyName";
const password = process.env.AMQP_PASSWORD || "sharedAccessKeyValue";
const port = parseInt(process.env.AMQP_PORT || "5671");
const receiverAddress = process.env.RECEIVER_ADDRESS || "address";
async function main(): Promise<void> {
const connectionOptions: ConnectionOptions = {
transport: "tls",
host: host,
hostname: host,
username: username,
password: password,
port: port,
reconnect: false
};
const connection: Connection = new Connection(connectionOptions);
const connection2: Connection = new Connection(connectionOptions);
const receiverName = "receiver-1";
// receive messages from the past one hour
const filterClause = `amqp.annotation.x-opt-enqueued-time > '${Date.now() - 3600 * 1000}'`;
const receiverOptions: ReceiverOptions = {
name: receiverName,
source: {
address: receiverAddress,
filter: {
"apache.org:selector-filter:string": types.wrap_described(filterClause, 0x468C00000004)
}
},
onSessionError: (context: EventContext) => {
const sessionError = context.session && context.session.error;
if (sessionError) {
console.log(">>>>> [%s] An error occurred for session of receiver '%s': %O.",
context.connection.id, context.receiver!.name, sessionError);
}
}
};
await connection.open();
const receiver: Receiver = await connection.createReceiver(receiverOptions);
receiver.on(ReceiverEvents.message, (context: EventContext) => {
console.log("[%s] Receiver '%s' received message: %O", context.connection.id,
context.receiver!.name, context.message);
});
receiver.on(ReceiverEvents.receiverError, (context: EventContext) => {
const receiverError = context.receiver && context.receiver.error;
if (receiverError) {
console.log(">>>>> [%s] An error occurred for receiver '%s': %O.",
context.connection.id, receiverName, receiverError);
}
});
await connection2.open();
const receiver2Options = Object.assign(receiverOptions);
receiver2Options.name = "receiver-2";
const receiver2: Receiver = await connection2.createReceiver(receiver2Options);
receiver2.on(ReceiverEvents.message, (context: EventContext) => {
console.log("[%s] Receiver '%s' received message: %O", context.connection.id,
context.receiver!.name, context.message);
});
receiver2.on(ReceiverEvents.receiverError, (context: EventContext) => {
const receiverError = context.receiver && context.receiver.error;
if (receiverError) {
console.log(">>>>> [%s] An error occurred for receiver '%s': %O.",
context.connection.id, receiverName, receiverError);
}
});
console.log("Done creating both the receivers on separate connections..");
// sleeping for 2 mins to let the receiver receive messages and then closing it.
await delay(120000);
await receiver.close();
await receiver2.close();
await connection.close();
await connection2.close();
}
main().catch((err) => console.log(err));
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment