Created
February 15, 2018 21:30
-
-
Save amarzavery/0a386db40c6d436cbf1d62cec8662018 to your computer and use it in GitHub Desktop.
simple send receive using rhea
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ | |
"name": "test-rhea", | |
"version": "0.1.0", | |
"dependencies": { | |
"rhea": "*", | |
"uuid": "*" | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const url = require('url'); | |
const rhea = require('rhea'); | |
const assert = require('assert'); | |
const uuid = require('uuid'); | |
async function connect(options) { | |
return new Promise((resolve, reject) => { | |
const connection = rhea.connect(options); | |
function onOpen(context) { | |
connection.removeListener('connection_open', onOpen); | |
connection.removeListener('connection_close', onClose); | |
connection.removeListener('disconnected', onClose); | |
resolve(connection); | |
} | |
function onClose(err) { | |
connection.removeListener('connection_open', onOpen); | |
connection.removeListener('connection_close', onClose); | |
connection.removeListener('disconnected', onClose); | |
reject(err); | |
} | |
connection.once('connection_open', onOpen); | |
connection.once('connection_close', onClose); | |
connection.once('disconnected', onClose); | |
}); | |
} | |
async function createSession(connection) { | |
return new Promise((resolve, reject) => { | |
const session = connection.create_session(); | |
function onOpen(context) { | |
session.removeListener('session_open', onOpen); | |
session.removeListener('session_close', onClose); | |
resolve(session); | |
} | |
function onClose(err) { | |
session.removeListener('session_open', onOpen); | |
session.removeListener('session_close', onClose); | |
reject(err); | |
} | |
session.once('session_open', onOpen); | |
session.once('session_close', onClose); | |
session.begin(); | |
}); | |
} | |
async function createSender(session, path, options) { | |
return new Promise((resolve, reject) => { | |
const sender = session.attach_sender(path, options); | |
function onOpen(context) { | |
sender.removeListener('sendable', onOpen); | |
sender.removeListener('sender_close', onClose); | |
resolve(sender); | |
} | |
function onClose(err) { | |
sender.removeListener('sendable', onOpen); | |
sender.removeListener('sender_close', onClose); | |
reject(err); | |
} | |
sender.once('sendable', onOpen); | |
sender.once('sender_close', onClose); | |
}); | |
} | |
async function createReceiver(session, path, options) { | |
return new Promise((resolve, reject) => { | |
const receiver = session.attach_receiver(path, options); | |
function onOpen(context) { | |
receiver.removeListener('receiver_open', onOpen); | |
receiver.removeListener('receiver_close', onClose); | |
resolve(receiver); | |
} | |
function onClose(err) { | |
receiver.removeListener('receiver_open', onOpen); | |
receiver.removeListener('receiver_close', onClose); | |
reject(err); | |
} | |
receiver.once('receiver_open', onOpen); | |
receiver.once('receiver_close', onClose); | |
}); | |
} | |
async function fromConnectionString(connectionString) { | |
const parsed = connectionString.split(';').reduce((acc, part) => { | |
const splitIndex = part.indexOf('='); | |
return { | |
...acc, | |
[part.substring(0, splitIndex)]: part.substring(splitIndex + 1) | |
}; | |
}, {}); | |
const queue = process.argv[3] | |
return await connect({ | |
transport: 'tls', | |
host: url.parse(parsed.Endpoint).hostname, | |
hostname: url.parse(parsed.Endpoint).hostname, | |
username: parsed.SharedAccessKeyName, | |
password: parsed.SharedAccessKey, | |
port: 5671, | |
reconnect_limit: 100 | |
}); | |
} | |
async function main() { | |
assert.notEqual(process.argv[2], undefined, 'node test.js <connectionString> <queueName>'); | |
assert.notEqual(process.argv[3], undefined, 'node test.js <connectionString> <queueName>'); | |
const connection = await fromConnectionString(process.argv[2]); | |
console.log('connected'); | |
const [senderSession, receiverSession] = await Promise.all([ | |
createSession(connection), | |
createSession(connection) | |
]); | |
console.log('got sessions'); | |
const queueName = process.argv[3]; | |
const [sender, receiver] = await Promise.all([ | |
createSender(senderSession, queueName, {}), | |
createReceiver(receiverSession, queueName, { | |
autoaccept: false | |
}) | |
]); | |
console.log('created sender and receiver'); | |
sender.send({ | |
body: Buffer.from(JSON.stringify({ | |
hello: uuid.v4() | |
})) | |
}); | |
console.log('sending'); | |
receiver.on('message', ({ message, delivery }) => { | |
console.log('rx: ', message.body.toString()); | |
delivery.update(undefined, rhea.message.accepted().described()); // Complete | |
//delivery.update(undefined, rhea.message.rejected().described()); // DeadLetter | |
// delivery.update(undefined, rhea.message.modified().described({ undeliverable_here: true })); // Abandon | |
// delivery.update(undefined, rhea.message.released().described()); // Defer | |
process.exit(); | |
}); | |
// TODO: handle errors after setup | |
} | |
main().catch(err => { | |
console.error(err); | |
process.exit(1); | |
}); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Don't think you need to remove .once listeners.