Skip to content

Instantly share code, notes, and snippets.

@Eunovo
Last active January 9, 2022 10:13
Show Gist options
  • Save Eunovo/c062954fd17ecee45a36e5c7ae0cd925 to your computer and use it in GitHub Desktop.
Save Eunovo/c062954fd17ecee45a36e5c7ae0cd925 to your computer and use it in GitHub Desktop.
Code describing a system to handle CPU-intensive jobs on NodeJS servers
class JobProcessor {
private assignedJobs = new Map<String, any>();
private queue = new Queue<any>();
private nWorkers = 5;
async loadOutstandingJobs() {
// load 'pending' or 'processing' jobs from DB
const jobs = await services.Job
.findMany({
$or: [
{ status: 'pending' }, { status: 'processing' }
]
});
jobs.forEach(job => this.queue.enqueue(job));
}
async registerJob(job: any) {
// save job to DB before it is added to queue
const _id = await services.Job.create({ ...job, status: 'pending' });
this.queue.enqueue({ ...job, _id });
}
async processJobs() {
const workers = new WorkerPool(this.nWorkers);
workers.init();
workers.on('message', async ({ id, message, status, data }) => {
if (message === WorkerMessage.job_complete) {
const job = this.assignedJobs.get(id);
this.assignedJobs.set(id, null);
// update job status
services.Job
.updateOne({ status, data }, { _id: job._id });
}
const newJob: any = await this.queue.dequeue();
workers.send(id, newJob);
this.assignedJobs.set(id, newJob);
// update job status
services.Job
.updateOne({ status: 'processing' }, { _id: newJob._id });
});
workers.on('exit', (id) => {
const ongoingJob = this.assignedJobs.get(id);
if (!ongoingJob) return;
// Re-queue the job that wasn't finished
this.queue.enqueue(ongoingJob);
});
}
}
export const jobProcessor = new JobProcessor();
export class Observable<T> {
private subscribers: Array<(data: T) => void> = [];
push(data: T) {
this.subscribers
.forEach((listener) => listener(data));
}
subscribe(listener: (data: T) => void) {
this.subscribers = [...this.subscribers, listener];
return () => {
this.subscribers = this.subscribers
.filter((value) => value !== listener);
}
}
}
import { Observable } from "./Observable";
enum QueueEvents {
enqueue = 'enqueue',
dequeue = 'dequeue'
}
export class Queue<T> {
private observable: Observable<QueueEvents> = new Observable();
private items: T[] = [];
enqueue(item: T) {
this.items.push(item);
this.observable.push(QueueEvents.enqueue);
}
async dequeue() {
if (this.items.length > 0) {
const currentItem = this.items[0];
this.items = this.items.filter((_, index) => index !== 0);
this.observable.push(QueueEvents.dequeue);
return currentItem;
}
return new Promise((resolve) => {
const unsubscribe = this.observable.subscribe(async (message) => {
if (message !== QueueEvents.enqueue) return;
resolve(await this.dequeue());
unsubscribe();
});
})
}
}
import { workerData, parentPort } from "worker_threads";
import { WorkerMessage } from "./WorkerMessage";
parentPort.on('message', async (job) => {
const { data } = job;
try {
// process job here
parentPort.postMessage({
message: WorkerMessage.job_complete,
status: 'completed',
data: { ...data, resultId }
});
} catch (error) {
parentPort.postMessage({
message: WorkerMessage.job_complete,
status: 'failed',
data: { ...data, error: error.message }
});
}
});
parentPort.postMessage({ message: WorkerMessage.request_job });
export enum WorkerMessage {
request_job = 'request_job',
job_complete = 'job_complete'
}
import { v4 } from "uuid";
import { Worker } from "worker_threads";
import { Observable } from "./Observable";
export class WorkerPool {
private observable = new Observable<{
event: "message" | "exit",
data?: any
}>();
private workers: Map<string, Worker> = new Map();
constructor(
private nWorkers: number
) { }
init() {
for (let i = 0; i < this.nWorkers; i++) {
this.createWorker();
}
}
private createWorker() {
const worker = new Worker(`${__dirname}/worker.js`);
const id = v4();
this.workers.set(id, worker);
worker.on("message", (value) => {
this.observable.push({ event: "message", data: { id, ...value } });
});
worker.on("exit", () => {
this.observable.push({ event: "exit" });
this.workers.delete(id);
// Create another worker to replace the closing worker
this.createWorker();
})
}
send(id: string, data: any) {
const worker = this.workers.get(id);
worker?.postMessage(data);
}
on(evt: string, handler: Function) {
this.observable.subscribe((value) => {
const { event, data } = value;
if (evt === event) {
handler(data);
}
});
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment