Created
October 1, 2018 21:28
-
-
Save amarzavery/5d12b12ce54a1f0375ddb7a43519d233 to your computer and use it in GitHub Desktop.
multiple amqp connections in the same process
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { | |
Connection, Receiver, EventContext, ConnectionOptions, ReceiverOptions, delay, ReceiverEvents, types | |
} from "../lib"; | |
import * as dotenv from "dotenv"; // Optional for loading environment configuration from a .env (config) file | |
dotenv.config(); | |
const host = process.env.AMQP_HOST || "host"; | |
const username = process.env.AMQP_USERNAME || "sharedAccessKeyName"; | |
const password = process.env.AMQP_PASSWORD || "sharedAccessKeyValue"; | |
const port = parseInt(process.env.AMQP_PORT || "5671"); | |
const receiverAddress = process.env.RECEIVER_ADDRESS || "address"; | |
async function main(): Promise<void> { | |
const connectionOptions: ConnectionOptions = { | |
transport: "tls", | |
host: host, | |
hostname: host, | |
username: username, | |
password: password, | |
port: port, | |
reconnect: false | |
}; | |
const connection: Connection = new Connection(connectionOptions); | |
const connection2: Connection = new Connection(connectionOptions); | |
const receiverName = "receiver-1"; | |
// receive messages from the past one hour | |
const filterClause = `amqp.annotation.x-opt-enqueued-time > '${Date.now() - 3600 * 1000}'`; | |
const receiverOptions: ReceiverOptions = { | |
name: receiverName, | |
source: { | |
address: receiverAddress, | |
filter: { | |
"apache.org:selector-filter:string": types.wrap_described(filterClause, 0x468C00000004) | |
} | |
}, | |
onSessionError: (context: EventContext) => { | |
const sessionError = context.session && context.session.error; | |
if (sessionError) { | |
console.log(">>>>> [%s] An error occurred for session of receiver '%s': %O.", | |
context.connection.id, context.receiver!.name, sessionError); | |
} | |
} | |
}; | |
await connection.open(); | |
const receiver: Receiver = await connection.createReceiver(receiverOptions); | |
receiver.on(ReceiverEvents.message, (context: EventContext) => { | |
console.log("[%s] Receiver '%s' received message: %O", context.connection.id, | |
context.receiver!.name, context.message); | |
}); | |
receiver.on(ReceiverEvents.receiverError, (context: EventContext) => { | |
const receiverError = context.receiver && context.receiver.error; | |
if (receiverError) { | |
console.log(">>>>> [%s] An error occurred for receiver '%s': %O.", | |
context.connection.id, receiverName, receiverError); | |
} | |
}); | |
await connection2.open(); | |
const receiver2Options = Object.assign(receiverOptions); | |
receiver2Options.name = "receiver-2"; | |
const receiver2: Receiver = await connection2.createReceiver(receiver2Options); | |
receiver2.on(ReceiverEvents.message, (context: EventContext) => { | |
console.log("[%s] Receiver '%s' received message: %O", context.connection.id, | |
context.receiver!.name, context.message); | |
}); | |
receiver2.on(ReceiverEvents.receiverError, (context: EventContext) => { | |
const receiverError = context.receiver && context.receiver.error; | |
if (receiverError) { | |
console.log(">>>>> [%s] An error occurred for receiver '%s': %O.", | |
context.connection.id, receiverName, receiverError); | |
} | |
}); | |
console.log("Done creating both the receivers on separate connections.."); | |
// sleeping for 2 mins to let the receiver receive messages and then closing it. | |
await delay(120000); | |
await receiver.close(); | |
await receiver2.close(); | |
await connection.close(); | |
await connection2.close(); | |
} | |
main().catch((err) => console.log(err)); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment