|
import Stomp from 'stomp-client'; |
|
import { activemq } from './config.json'; |
|
import console from './console'; |
|
|
|
const hostname = require('os').hostname(); |
|
|
|
class QueueClient { |
|
constructor(...queueNames) { |
|
this.queueNames = queueNames; |
|
this.client = new Stomp(activemq.host, activemq.port, activemq.username, activemq.password, '1.1'); |
|
this.client.connect(this.connected.bind(this)); |
|
} |
|
inspect(job) { |
|
const cloned = JSON.parse(JSON.stringify(job)); |
|
['expires', 'destination', 'subscription', 'priority', 'message-id', 'timestamp', 'redelivered'].forEach(i => delete cloned[i]); |
|
return JSON.stringify(cloned); |
|
} |
|
connected() { |
|
this.subscribe(); |
|
} |
|
unsubscribe(...queueNames) { |
|
if (!queueNames.length) { |
|
queueNames = this.queueNames; |
|
} |
|
queueNames.forEach( (queueName) => { |
|
const subscriptionId = `${hostname}-${queueName}`; |
|
const options = { 'activemq.prefetchSize': 1, ack: 'client-individual', id: subscriptionId }; |
|
this.client.unsubscribe(`/queue/${activemq.queues[queueName]}`, options); |
|
}); |
|
} |
|
subscribe(...queueNames) { |
|
if (!queueNames.length) { |
|
queueNames = this.queueNames; |
|
} |
|
queueNames.forEach( (queueName) => { |
|
const subscriptionId = `${hostname}-${queueName}`; |
|
const options = { 'activemq.prefetchSize': 1, ack: 'client-individual', id: subscriptionId }; |
|
const cb = (body, headers) => { |
|
const id = headers['message-id']; |
|
const ack = () => this.client.ack(id, subscriptionId); |
|
const nack = () => { |
|
console.error('nack'); |
|
// this.client.nack(id, subscriptionId); |
|
}; |
|
console.log(`Consuming at ${queueName} => ${this.inspect(headers)}`); |
|
this.process(headers, { ack, nack, queueName }); |
|
}; |
|
this.client.subscribe(`/queue/${activemq.queues[queueName]}`, cb, options); |
|
}); |
|
} |
|
process(job) { |
|
throw 'This is a template method meant to be overriten by subclases'; |
|
} |
|
publish(queueName, job) { |
|
console.log(`Publish at ${queueName} => ${this.inspect(job)}`); |
|
this.client.publish(`/queue/${activemq.queues[queueName]}`, job, job); |
|
const data = { ...job, queue: queueName }; |
|
this.client.publish(`/queue/${activemq.queues.echo}`, data, data); |
|
} |
|
} |
|
|
|
export default QueueClient; |