Skip to content

Instantly share code, notes, and snippets.

@SaladHut
Last active May 27, 2022 05:11
Show Gist options
  • Save SaladHut/fe8005634a8649d258aeb63c81bc7f58 to your computer and use it in GitHub Desktop.
Save SaladHut/fe8005634a8649d258aeb63c81bc7f58 to your computer and use it in GitHub Desktop.
A simple task manager with with concurrency support.

Basic task manager

This is a work in progress.

import { MLITaskGroup } from './MLITaskGroup';
//import { MLIResume } from './MLIResume';
//FIXME Use MLIResume to update progress.
export class MLITask {
static status = {
INITIAL: 0,
QUEUED: 1,
STARTED: 2,
RESOLVED: 3,
REJECTED: 4,
};
static priority = {
DEFAULT: 0,
HIGH: 1000000,
MAX: Infinity,
HIGHER: Symbol('HIGHER'),
LOWER: Symbol('LOWER'),
}
key;
callback;
group;
status = MLITask.status.INITIAL;
resolve;
reject;
result;
__priority = MLITask.priority.DEFAULT;
set priority( newPriority ){
this.__priority = newPriority;
//FIXME this should also start the task if the new priority would exceed into the concurrency limit of the running priorities.
this.__prioritize();
}
get priority(){
return this.__priority;
}
__prioritize(){
const group = this.group;
if( this.status === MLITask.status.QUEUED ){
this.status = MLITask.status.INITIAL;
group.queue.splice( group.queue.indexOf( this ), 1 );
}
if( this.status === MLITask.status.INITIAL ){
// Count the current running tasks with equal or higher priority.
const prioritizedCount = [...group.runningTasks ]
.reduce(( pv, r ) => r.priority >= this.priority ? pv + 1 : pv, 0 );
// No slot available for lower or equal priority, queue this.
if( prioritizedCount >= group.concurrentLimit ){
// Insert this after one with equal priority.
if( group.queue.every(( q, index ) =>{
if( q.priority >= this.priority ){
return true;
}
group.queue.splice( index, 0, this );
return false;
})){
group.queue.push( this );
}
this.status = MLITask.status.QUEUED;
console.debug(`๐ŸŽ›๏ธ queue(${ group.queue.indexOf( this )})`, this.key, group.concurrent );
} else {
// This priority is higher (then forced the way in) or have available running slot.
MLITask.addConcurrent( this );
}
}
}
constructor( taskCallback,
resultKey = Symbol('resultKey'),
priority = MLITask.priority.DEFAULT,
groupKey = MLITaskGroup.DefaultGroupKey ){
const group = MLITaskGroup.map( groupKey );
if( resultKey ){
if( group.resultMap.has( resultKey )){
return group.resultMap.get( resultKey );
}
group.resultMap.set( resultKey, this );
}
this.group = group;
this.key = resultKey;
this.callback = taskCallback;
if( priority === MLITask.priority.HIGHER ){
priority = 1 + [...group.runningTasks ]
.reduce(( pv, r ) => Math.max( pv, r.priority === Infinity ? 0 : r.priority ), 0 );
console.debug(`๐ŸŽ›๏ธ [${ priority } :HIGHER](${ resultKey })` );
} else if( priority === MLITask.priority.LOWER ){
priority = [...group.runningTasks ]
.reduce(( pv, r ) => Math.min( pv, r.priority ), 0 ) - 1;
priority = group.queue
.reduce(( pv, r ) => Math.min( pv, r.priority ), priority ) - 1;
console.debug(`๐ŸŽ›๏ธ [${ priority } :LOWER](${ resultKey })` );
} else {
console.debug(`๐ŸŽ›๏ธ [${ priority } :PRIORITY] (${ resultKey })` );
}
this.priority = priority;
this.result = new Promise(( resolveResult, rejectResult ) =>{
this.resolve = resolveResult;
this.reject = rejectResult;
});
}
static async addConcurrent( currentTask ){
const group = currentTask.group;
group.concurrent++;
do {
console.debug(`๐ŸŽ›๏ธ [${ group.concurrent } :CON] >>> run(${ currentTask.key })`);
try {
group.runningTasks.add( currentTask );
currentTask.status = MLITask.status.STARTED;
const result = await new Promise( currentTask.callback );
currentTask.status = MLITask.status.RESOLVED;
currentTask.resolve( result );
} catch( err ){
currentTask.status = MLITask.status.REJECTED;
currentTask.reject( err );
} finally {
console.debug(`๐ŸŽ›๏ธ <<< finished(${ currentTask.key })`);
group.runningTasks.delete( currentTask );
}
} while (( currentTask = group.queue.shift()));
group.concurrent--;
console.debug(`๐ŸŽ›๏ธ [${ group.concurrent } :CON]`);
}
/**** PROGRESS ****/
__progressPromise = null;
__progressResolve;
//__progressReject;
__makeAPromise(){
this.__progressPromise = new Promise((res,rej) =>{
this.__progressResolve = res;
});
}
get progress(){
if( !this.__progressPromise ){
this.__makeAPromise();
}
return this.__progressPromise;
}
// Callback will set task.progress, if they want to.
set progress( newProgress ){
if( this.__progressPromise ){
this.__progressResolve( newProgress );
if( newProgress < 1 ){
this.__makeAPromise();
}
}
}
}
export class MLITaskGroup /* FIXME extends MLITask */ {
static DefaultLimit = 5;
static DefaultGroupKey = Symbol('default');
static allGroups = new Map();
static map( key, limit ){
return this.allGroups.get( key ) ?? new MLITaskGroup( key, limit );
}
resultMap = new Map();
runningTasks = new Set();
/* TODO Global task queue & group priority. */
queue = [];
concurrentLimit;
concurrent = 0;
key;
constructor( key = MLITaskGroup.DefaultGroupKey, concurrentLimit = MLITaskGroup.DefaultLimit ){
if( MLITaskGroup.allGroups.has( key )){
const group = MLITaskGroup.allGroups.get( key );
group.concurrentLimit = concurrentLimit;
return group;
}
this.key = key;
this.concurrentLimit = concurrentLimit;
MLITaskGroup.allGroups.set( key, this );
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment