Skip to content

Instantly share code, notes, and snippets.

@jackyef
Created November 13, 2019 11:02
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jackyef/494c775656c4b514723c0c0d54f1fed1 to your computer and use it in GitHub Desktop.
Save jackyef/494c775656c4b514723c0c0d54f1fed1 to your computer and use it in GitHub Desktop.
Simple TaskManager implementation for NodeJS
const EventEmitter = require('events').EventEmitter;
class Task extends EventEmitter {
constructor(cb, name = 'N/A', type = '') {
super();
this.cb = cb;
this.name = name;
this.type = type; // will be used to determine which queue this task should be enqueued to
this.next = null; // can assign another task to .next for sequencing tasks
this.started = false;
// in case we want to await the task, we just await task.promise
this.promise = new Promise((resolve, reject) => {
this.resolve = resolve;
this.reject = reject;
});
this.toString = () => {
return `[Task: ${this.cb.name || name}]`;
}
}
async start() {
if (!this.started) {
try {
const result = this.cb();
if (result instanceof Promise) {
this.resolve(await result);
} else {
this.resolve(result);
}
} catch (err) {
this.reject(err);
}
}
}
}
class TaskManager {
constructor() {
this.queues = {
'build': {
availableWorkers: ['0.0.0.1'], // array of workers for this queue type
},
'test': {
availableWorkers: ['1.2.3.4'],
},
};
this.backlogTasks = [];
this.activeTasks = new Map();
this.busyWorkers = new Map();
}
scheduleTask(task) {
this.backlogTasks.push(task);
}
start() {
const debug = () => {
console.log('[TaskManager workloop]');
console.log('Tasks in backlog: ', this.backlogTasks.map(t => t.name).join(', '));
const at = [];
this.activeTasks.forEach((v, k, m) => {
at.push(k.name);
});
console.log('Task in progress: ', at.join(', '));
if (this.busyWorkers.size > 0) {
console.log('Busy workers:');
this.busyWorkers.forEach((v, k, m) => {
console.log('-', `Worker [${k}] is working on ${v.task.name}`);
});
} else {
console.log('All workers are available!');
}
console.log('===================================');
}
const performTask = (task, worker) => {
this.activeTasks.set(task, task.name);
console.log('started task ' + task);
task.start();
return task.promise.then(result => {
console.log(`Task "${task.name}" finished, result:`, result);
this.activeTasks.delete(task);
if (worker) {
// we need to mark the worker as not busy again
const availableWorkers = this.queues[task.type].availableWorkers;
const busyWorkers = this.busyWorkers;
availableWorkers.push(worker);
busyWorkers.delete(worker);
}
// if there is a next task chained to this task, start it immediately
if (task.next) {
// return here so it can be tail-call optimised
// just schedule the next task and let the TaskManager decide when to do it
return this.scheduleTask(task.next);
}
}).catch(err => {
console.log(`Task "${task.name}" errored, err:`, err);
})
}
const performWork = () => {
console.log('[TaskManager] Checking if there are works in backlog...');
if (this.backlogTasks.length > 0) {
const totalTasksInBacklog = this.backlogTasks.length;
// add all tasks in backlog to activeTasks
let count = 0;
while (this.backlogTasks[0]) {
const currentTask = this.backlogTasks[0];
// try to start performing the task
if (!currentTask.type) {
// no type, this task can be performed immediately
performTask(currentTask);
// shift the task out of backlog
this.backlogTasks.shift();
} else {
// this task needs specific queue, let's see if the queue is available
const availableWorkers = this.queues[currentTask.type].availableWorkers;
const busyWorkers = this.busyWorkers;
if (availableWorkers.length > 0) {
const usedWorker = availableWorkers.shift(); // take one worker out
// add it to the map so we know it's busy
busyWorkers.set(usedWorker, {
task: currentTask,
});
performTask(currentTask, usedWorker);
// shift the task out of backlog
this.backlogTasks.shift();
} else {
console.log(`[TaskManager] No worker available for task.type: ${currentTask.type}; Will check again in next loop.`);
}
}
// increment counter
count += 1;
if (count >= totalTasksInBacklog) {
// we have iterated over all the backlog tasks
// some of the tasks maybe can't be performed (waiting for queue)
// so let's just break out of loop here, and see if at the next
// performWork() call, it can be performed
break;
}
}
} else {
console.log('[TaskManager] No tasks in backlog!');
}
// schedule next work
this.timeout = setTimeout(performWork, 1000);
// for debugging
debug();
}
this.timeout = setTimeout(performWork, 1000);
}
shutdown() {
console.log('[TaskManager] Trying to shutdown...');
let timeout;
const tryToStopPerformingWork = (onSuccess, onError) => {
if (this.activeTasks.size < 1) {
// no active tasks, we can shutdown immediately
this.timeout = clearTimeout(this.timeout);
onSuccess();
} else {
console.log('Failed to shutdown. There are still', this.activeTasks.size, 'tasks running.');
const at = [];
this.activeTasks.forEach((v, k, m) => {
at.push(k.name);
});
console.log('Tasks:', at.join(', '));
onError();
}
}
return new Promise((resolve, reject) => {
const onSuccess = () => {
console.log('[TaskManager] Shutdown successful!');
resolve('[TaskManager] Shutdown successful!');
};
const onError = () => {
return setTimeout(() => {
tryToStopPerformingWork(onSuccess, onError);
}, 1000);
}
tryToStopPerformingWork(onSuccess, onError);
});
}
}
// this should be a singleton
const tm = new TaskManager();
const TaskA = new Task(() => {
console.log('[TaskA] Just doing some synchronous operations.');
return 'something';
}, 'TaskA');
const TaskB = new Task(() => {
console.log('[TaskB] Starting an asynchronous operation...')
return new Promise((resolve) => {
setTimeout(() => {
resolve('[TaskB] Resolved after 3000ms');
}, 3000)
});
}, 'TaskB');
const TaskC = new Task(() => {
console.log('[TaskC] Should only be done after TaskB finishes');
return '[TaskC] Finished!';
}, 'TaskC');
TaskB.next = TaskC;
tm.scheduleTask(TaskA);
tm.scheduleTask(TaskB);
tm.start();
// we will shutdown here, but TM will wait until all tasks are finished
// It also ensures chained tasks are finished
setTimeout(() => {
tm.shutdown();
}, 1100);
const BuildTaskA = new Task(() => {
return new Promise((resolve) => {
setTimeout(() => {
resolve('[BuildTaskA] Resolved after 10000ms');
}, 10000)
});
}, 'BuildTaskA', 'build');
const DeployTaskA = new Task(() => {
return new Promise((resolve) => {
setTimeout(() => {
resolve('[DeployTaskA] Resolved after 3000ms');
}, 3000)
});
}, 'DeployTaskA');
const PostDeployTaskA = new Task(() => {
return new Promise((resolve) => {
setTimeout(() => {
resolve('[PostDeployTaskA] Resolved after 3000ms');
}, 3000)
});
}, 'PostDeployTaskA');
// simulating Build-Deploy-PostDeploy
BuildTaskA.next = DeployTaskA;
DeployTaskA.next = PostDeployTaskA;
const BuildTaskB = new Task(() => {
return new Promise((resolve) => {
setTimeout(() => {
resolve('[BuildTaskB] Resolved after 10000ms');
}, 10000)
});
}, 'BuildTaskB', 'build');
const DeployTaskB = new Task(() => {
return new Promise((resolve) => {
setTimeout(() => {
resolve('[DeployTaskB] Resolved after 3000ms');
}, 3000)
});
}, 'DeployTaskB');
const PostDeployTaskB = new Task(() => {
return new Promise((resolve) => {
setTimeout(() => {
resolve('[PostDeployTaskB] Resolved after 3000ms');
}, 3000)
});
}, 'PostDeployTaskB');
// simulating chained tasks
BuildTaskB.next = DeployTaskB;
DeployTaskB.next = PostDeployTaskB;
tm.scheduleTask(BuildTaskA);
tm.scheduleTask(BuildTaskB);
// users can await the Task if they want
(async () => {
console.log('Waiting for BuildTaskA...');
await BuildTaskA.promise;
console.log('BuildTaskA finished!');
})();
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment