Skip to content

Instantly share code, notes, and snippets.

@amarzavery
Created February 15, 2018 21:30
Show Gist options
  • Save amarzavery/0a386db40c6d436cbf1d62cec8662018 to your computer and use it in GitHub Desktop.
Save amarzavery/0a386db40c6d436cbf1d62cec8662018 to your computer and use it in GitHub Desktop.
simple send receive using rhea
{
"name": "test-rhea",
"version": "0.1.0",
"dependencies": {
"rhea": "*",
"uuid": "*"
}
}
const url = require('url');
const rhea = require('rhea');
const assert = require('assert');
const uuid = require('uuid');
async function connect(options) {
return new Promise((resolve, reject) => {
const connection = rhea.connect(options);
function onOpen(context) {
connection.removeListener('connection_open', onOpen);
connection.removeListener('connection_close', onClose);
connection.removeListener('disconnected', onClose);
resolve(connection);
}
function onClose(err) {
connection.removeListener('connection_open', onOpen);
connection.removeListener('connection_close', onClose);
connection.removeListener('disconnected', onClose);
reject(err);
}
connection.once('connection_open', onOpen);
connection.once('connection_close', onClose);
connection.once('disconnected', onClose);
});
}
async function createSession(connection) {
return new Promise((resolve, reject) => {
const session = connection.create_session();
function onOpen(context) {
session.removeListener('session_open', onOpen);
session.removeListener('session_close', onClose);
resolve(session);
}
function onClose(err) {
session.removeListener('session_open', onOpen);
session.removeListener('session_close', onClose);
reject(err);
}
session.once('session_open', onOpen);
session.once('session_close', onClose);
session.begin();
});
}
async function createSender(session, path, options) {
return new Promise((resolve, reject) => {
const sender = session.attach_sender(path, options);
function onOpen(context) {
sender.removeListener('sendable', onOpen);
sender.removeListener('sender_close', onClose);
resolve(sender);
}
function onClose(err) {
sender.removeListener('sendable', onOpen);
sender.removeListener('sender_close', onClose);
reject(err);
}
sender.once('sendable', onOpen);
sender.once('sender_close', onClose);
});
}
async function createReceiver(session, path, options) {
return new Promise((resolve, reject) => {
const receiver = session.attach_receiver(path, options);
function onOpen(context) {
receiver.removeListener('receiver_open', onOpen);
receiver.removeListener('receiver_close', onClose);
resolve(receiver);
}
function onClose(err) {
receiver.removeListener('receiver_open', onOpen);
receiver.removeListener('receiver_close', onClose);
reject(err);
}
receiver.once('receiver_open', onOpen);
receiver.once('receiver_close', onClose);
});
}
async function fromConnectionString(connectionString) {
const parsed = connectionString.split(';').reduce((acc, part) => {
const splitIndex = part.indexOf('=');
return {
...acc,
[part.substring(0, splitIndex)]: part.substring(splitIndex + 1)
};
}, {});
const queue = process.argv[3]
return await connect({
transport: 'tls',
host: url.parse(parsed.Endpoint).hostname,
hostname: url.parse(parsed.Endpoint).hostname,
username: parsed.SharedAccessKeyName,
password: parsed.SharedAccessKey,
port: 5671,
reconnect_limit: 100
});
}
async function main() {
assert.notEqual(process.argv[2], undefined, 'node test.js <connectionString> <queueName>');
assert.notEqual(process.argv[3], undefined, 'node test.js <connectionString> <queueName>');
const connection = await fromConnectionString(process.argv[2]);
console.log('connected');
const [senderSession, receiverSession] = await Promise.all([
createSession(connection),
createSession(connection)
]);
console.log('got sessions');
const queueName = process.argv[3];
const [sender, receiver] = await Promise.all([
createSender(senderSession, queueName, {}),
createReceiver(receiverSession, queueName, {
autoaccept: false
})
]);
console.log('created sender and receiver');
sender.send({
body: Buffer.from(JSON.stringify({
hello: uuid.v4()
}))
});
console.log('sending');
receiver.on('message', ({ message, delivery }) => {
console.log('rx: ', message.body.toString());
delivery.update(undefined, rhea.message.accepted().described()); // Complete
//delivery.update(undefined, rhea.message.rejected().described()); // DeadLetter
// delivery.update(undefined, rhea.message.modified().described({ undeliverable_here: true })); // Abandon
// delivery.update(undefined, rhea.message.released().described()); // Defer
process.exit();
});
// TODO: handle errors after setup
}
main().catch(err => {
console.error(err);
process.exit(1);
});
@tony-gutierrez
Copy link

Don't think you need to remove .once listeners.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment