Create a gist now

Instantly share code, notes, and snippets.

Embed
Distributing Work
module.exports = {
"env": {
"es6": true,
"node": true
},
"extends": "eslint:recommended",
"parserOptions": {
"sourceType": "module"
},
"rules": {
"indent": [
"error",
2
],
"linebreak-style": [
"error",
"unix"
],
"quotes": [
"error",
"single"
],
"semi": [
"error",
"always"
]
}
};

activemq

docker run --name='activemq' -it --rm -P \
-e 'ACTIVEMQ_ADMIN_LOGIN=admin' -e 'ACTIVEMQ_ADMIN_PASSWORD=admin' \
-e 'ACTIVEMQ_WRITE_LOGIN=username' -e 'ACTIVEMQ_WRITE_PASSWORD=password' \
-e 'ACTIVEMQ_STATIC_QUEUES=optimization/pending;optimization/done;errored;completed;notification/pending;notification/done' \
-e 'ACTIVEMQ_MIN_MEMORY=1024' -e  'ACTIVEMQ_MAX_MEMORY=4096' \
-p 8161:8161 \
-p 61616:61616 \
-p 61613:61613 \
webcenter/activemq:5.14.3

sampling

mkdir -p watched/{completed,errored,queued,resized}

repeat 200 { touch watched/$RANDOM.txt  }

processes

npm run watcher
npm run processor
npm run notifier
npm run server

dashboard

http://localhost:8161/admin/queues.jsp
http://localhost:3000/
tree watched

references

http://camel.apache.org/enterprise-integration-patterns.html
{
"webserver": {
"port": 3000
},
"watched": "watched",
"activemq": {
"host": "127.0.0.1",
"port": 61613,
"username": "username",
"password": "password",
"queues": {
"queued": "queued",
"propagation": "propagation",
"processed": "processed",
"errored": "errored",
"completed": "completed",
"locked": "locked",
"notified": "notified",
"echo": "echo"
}
}
}
const logger = {
log: (...args) => console.log('->', ...args),
warn: (...args) => console.log('->', ...args),
error: (...args) => console.log('ERROR:', ...args),
};
export default logger;
import fs from 'fs';
import { spawn } from 'child_process';
import config from './config';
const test = path => /.*\.jpg/.test(path);
const notify = (job, cb) => {
const err = Math.random() > 0.9 ? 'Notification failed!' : '';
cb(err, 'notification response');
};
const process = (job, done) => {
if (/error/.test(job.base)) {
return done('Couldn"t process');
}
const reader = fs.createReadStream(`${config.watched}/queued/${job.base}`);
const writer = fs.createWriteStream(`${config.watched}/resized/${job.base}`);
const args = [
'jpg:-',
'-resize', 512,
'-quality', 10,
'jpg:-',
];
const convert = spawn('convert', args);
reader.pipe(convert.stdin).on('error', done);
convert.stdout.pipe(writer).on('error', done);
writer.on('finish', () => done(null, `Processed at ${new Date()}`));
};
export { test, process, notify };
const process = (job, done) => {
done(null, { tool: `Processed at ${new Date()} with convert`});
};
const test = path => /.*\..*/.test(path);
export { test, process };
const process = (job, done) => {
done('Couldn"t process');
};
const test = path => /error/.test(path);
export { test, process };
const process = (job, done) => {
done(null, { tool: `Processed at ${new Date()} with value ${job.feeded}`});
};
const test = path => /.*feed.*/.test(path);
const feed = true;
export { test, process, feed };
import QueueClient from './queue-client';
class Notifier extends QueueClient {
constructor() {
super('propagation', 'locked');
}
process(job, { ack, queueName }) {
ack();
if (queueName === 'propagation') {
if (job.locked === 'true') {
this.unsubscribe('locked');
} else {
this.subscribe('locked');
}
} else {
job.notifiedAt = new Date();
job.notified = true;
this.publish('notified', job);
this.publish('completed', job);
}
}
}
new Notifier();
{
"name": "distributing-work",
"version": "1.0.0",
"scripts": {
"watcher": "nodemon --exec babel-node watcher.js",
"processor": "nodemon --exec babel-node processor.js",
"notifier": "nodemon --exec babel-node notifier.js",
"server": "nodemon --exec babel-node server.js"
},
"author": "",
"license": "ISC",
"dependencies": {
"chokidar": "^2.0.3",
"query-string": "^6.1.0",
"stomp-client": "^0.9.0"
},
"devDependencies": {
"babel-cli": "^6.26.0",
"babel-eslint": "^8.2.3",
"babel-plugin-transform-object-rest-spread": "^6.26.0",
"babel-preset-env": "^1.7.0",
"eslint": "^4.19.1",
"eslint-config-amiga": "^1.9.1",
"nodemon": "^1.17.5"
}
}
import QueueClient from './queue-client';
import console from './console';
import registry from './registry';
class Processor extends QueueClient {
constructor() {
super('queued');
}
process(job, { ack }) {
if (job.feeded === 'false') {
// IMPROVE
return ack();
}
const processor = registry[job.processor];
processor.process(job, (err, meta) => {
ack();
if (err) {
job.err = err;
console.error(err);
this.publish('errored', job);
} else {
if (processor.notify) {
this.publish('locked', job);
} else {
job.processedAt = new Date();
job.meta = JSON.stringify(meta);
this.publish('completed', job);
}
}
});
}
}
console.log(' [*] Waiting for jobs. To exit press CTRL+C');
new Processor();
import Stomp from 'stomp-client';
import { activemq } from './config.json';
import console from './console';
const hostname = require('os').hostname();
class QueueClient {
constructor(...queueNames) {
this.queueNames = queueNames;
this.client = new Stomp(activemq.host, activemq.port, activemq.username, activemq.password, '1.1');
this.client.connect(this.connected.bind(this));
}
inspect(job) {
const cloned = JSON.parse(JSON.stringify(job));
['expires', 'destination', 'subscription', 'priority', 'message-id', 'timestamp', 'redelivered'].forEach(i => delete cloned[i]);
return JSON.stringify(cloned);
}
connected() {
this.subscribe();
}
unsubscribe(...queueNames) {
if (!queueNames.length) {
queueNames = this.queueNames;
}
queueNames.forEach( (queueName) => {
const subscriptionId = `${hostname}-${queueName}`;
const options = { 'activemq.prefetchSize': 1, ack: 'client-individual', id: subscriptionId };
this.client.unsubscribe(`/queue/${activemq.queues[queueName]}`, options);
});
}
subscribe(...queueNames) {
if (!queueNames.length) {
queueNames = this.queueNames;
}
queueNames.forEach( (queueName) => {
const subscriptionId = `${hostname}-${queueName}`;
const options = { 'activemq.prefetchSize': 1, ack: 'client-individual', id: subscriptionId };
const cb = (body, headers) => {
const id = headers['message-id'];
const ack = () => this.client.ack(id, subscriptionId);
const nack = () => {
console.error('nack');
// this.client.nack(id, subscriptionId);
};
console.log(`Consuming at ${queueName} => ${this.inspect(headers)}`);
this.process(headers, { ack, nack, queueName });
};
this.client.subscribe(`/queue/${activemq.queues[queueName]}`, cb, options);
});
}
process(job) {
throw 'This is a template method meant to be overriten by subclases';
}
publish(queueName, job) {
console.log(`Publish at ${queueName} => ${this.inspect(job)}`);
this.client.publish(`/queue/${activemq.queues[queueName]}`, job, job);
const data = { ...job, queue: queueName };
this.client.publish(`/queue/${activemq.queues.echo}`, data, data);
}
}
export default QueueClient;
const exports = { };
const modules = [
"error-processor",
"feed-processor",
"convert-processor",
"dummy-processor",
].map(name => {
const module = require(`./${name}`);
module.name = name;
exports[name] = module;
return module;
});
const find = fullpath => modules.find(module => module.test(fullpath));
export default { ...exports, find };
import http from 'http';
import config from './config.json';
import console from './console';
import QueueClient from './queue-client';
import queryString from 'query-string';
const db = {
jobs: {},
locked: false
};
class ServerQueueClient extends QueueClient {
constructor() {
super('echo');
}
process(job, { ack }) {
ack();
if (job.base) {
let previous = db.jobs[job.base];
db.jobs[job.base] = Object.assign(previous || {}, job);
}
}
}
const queue = new ServerQueueClient();
const server = http.createServer((req, res) => {
const params = queryString.parse(req.url.substring(1));
if (params.toggle) {
db.locked = !db.locked;
queue.publish('propagation', { locked: db.locked });
res.writeHead(302, { 'Location': '/' });
} else if (params.feed && params.value) {
console.log(params);
let job = db.jobs[params.feed];
if (job && job.feeded === 'false') {
job.feeded = params.value;
queue.publish('queued', job);
}
res.writeHead(302, { 'Location': '/' });
} else {
res.writeHead(200, { 'Content-Type': 'text/html' });
let items = Object.values(db.jobs).map(job => `<li>${queue.inspect(job)}</li>`);
res.write(`
<h1>locked? ${db.locked}</h1>
<br/>
<a href="/?toggle">toggle</a>
<ol>
${items}
</ol>
`);
}
res.end();
});
console.log(`listening on port ${config.webserver.port}`);
server.listen(config.webserver.port);
import fs from 'fs';
import path from 'path';
import chokidar from 'chokidar';
import console from './console';
import { watched } from './config.json';
import registry from './registry';
import QueueClient from './queue-client';
class Watcher extends QueueClient {
constructor() {
super('completed', 'errored');
}
connected() {
super.connected();
chokidar.watch(`${watched}/*.*`, {
ignoreInitial: true,
persistent: true,
}).on('add', fullpath => {
const processor = registry.find(fullpath);
const tokens = path.parse(fullpath);
let job = { base: tokens.base, processor: processor.name };
if (processor.feed) { job.feeded = false; }
if (processor.notify) { job.notified = false; }
this.publish('queued', job);
fs.rename(fullpath, `${tokens.dir}/queued/${tokens.base}`, (err) => {
if (err) { throw err; }
});
});
}
process(job, { nack, ack, queueName }) {
fs.rename(`${watched}/queued/${job.base}`, `${watched}/${queueName}/${job.base}`, () => (Math.random() > 0.9) ? nack() : ack());
}
}
console.log(' [*] Waiting for drops. To exit press CTRL+C');
new Watcher();
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment