Skip to content

Instantly share code, notes, and snippets.

@feliperohdee
Created January 28, 2021 21:47
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save feliperohdee/e7457453c75c7ab421d61195cf4a9931 to your computer and use it in GitHub Desktop.
Save feliperohdee/e7457453c75c7ab421d61195cf4a9931 to your computer and use it in GitHub Desktop.
Simple RXJS based worker broker
const workerThreads = require('worker_threads');
const rx = require('rxjs');
class Worker {
constructor(file, data) {
this.messageId = 0;
this.callbacks = new Map();
this.client = new workerThreads.Worker(file, {
workerData: {
id: Date.now(),
...data
}
});
this.client.on('message', message => {
const callback = this.callbacks.get(message.id);
if (callback) {
if (message.type === 'success') {
callback.success(message.data);
} else {
callback.error(new Error(message.data));
}
}
});
this.client.on('exit', code => {
if (code !== 1) {
this.callbacks.forEach(callback => {
callback.error(new Error(`exit ${code}`));
});
}
});
Worker.clients.add(this);
}
close() {
this.client.terminate();
Worker.clients.delete(this);
}
send(data = {}) {
return new rx.Observable(subscriber => {
const id = ++this.messageId;
this.callbacks.set(id, {
error: err => {
this.callbacks.delete(id);
subscriber.error(err);
},
success: data => {
this.callbacks.delete(id);
subscriber.next(data);
subscriber.complete();
}
});
this.client.postMessage({
id,
data
});
});
}
}
Worker.clients = new Set();
Worker.closeAll = () => {
Worker.clients.forEach(client => {
client.close();
});
};
module.exports = Worker;
const chai = require('chai');
const sinon = require('sinon');
const sinonChai = require('sinon-chai');
const workerThreads = require('worker_threads');
const Worker = require('./Worker');
const expect = chai.expect;
chai.use(sinonChai);
describe('Worker.js', () => {
let worker;
before(() => {
worker = new Worker('./backendWorker.js');
});
after(() => {
Worker.closeAll();
});
describe('constructor', () => {
it('should have properties', () => {
expect(worker.messageId).to.be.equal(0);
expect(worker.callbacks).to.be.instanceOf(Map);
expect(worker.client).to.be.instanceOf(workerThreads.Worker);
});
it('should export static closeAll', () => {
expect(Worker.closeAll).to.be.a('function');
});
});
describe('closeAll', () => {
beforeEach(() => {
sinon.stub(worker, 'close');
});
afterEach(() => {
worker.close.restore();
});
it('should call close to all workers', () => {
Worker.closeAll();
expect(worker.close).to.have.been.calledOnce;
});
});
describe('close', () => {
beforeEach(() => {
sinon.stub(worker.client, 'terminate');
sinon.stub(Worker.clients, 'delete');
});
afterEach(() => {
worker.client.terminate.restore();
Worker.clients.delete.restore();
});
it('should call client.terminate', () => {
worker.close();
expect(worker.client.terminate).to.have.been.calledOnce;
expect(Worker.clients.delete).to.have.been.calledOnceWithExactly(worker);
});
});
describe('send', () => {
it('should send request and wait for response', done => {
worker.send()
.subscribe(null, err => {
expect(err.message).to.equal('InexistentMethodError');
expect(worker.callbacks.size).to.equal(0);
done();
});
const callback = worker.callbacks.get(1);
expect(worker.callbacks.size).to.equal(1);
expect(Worker.clients.size).to.equal(1);
expect(callback.error).to.be.a('function');
expect(callback.success).to.be.a('function');
});
it('should increment messageId', done => {
worker.send()
.subscribe(null, err => {
expect(err.message).to.equal('InexistentMethodError');
expect(worker.callbacks.size).to.equal(0);
done();
});
const callback = worker.callbacks.get(2);
expect(worker.callbacks.size).to.equal(1);
expect(callback.error).to.be.a('function');
expect(callback.success).to.be.a('function');
});
});
});
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment