This is a work in progress.
Last active
May 27, 2022 05:11
-
-
Save SaladHut/fe8005634a8649d258aeb63c81bc7f58 to your computer and use it in GitHub Desktop.
A simple task manager with with concurrency support.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { MLITaskGroup } from './MLITaskGroup'; | |
//import { MLIResume } from './MLIResume'; | |
//FIXME Use MLIResume to update progress. | |
export class MLITask { | |
static status = { | |
INITIAL: 0, | |
QUEUED: 1, | |
STARTED: 2, | |
RESOLVED: 3, | |
REJECTED: 4, | |
}; | |
static priority = { | |
DEFAULT: 0, | |
HIGH: 1000000, | |
MAX: Infinity, | |
HIGHER: Symbol('HIGHER'), | |
LOWER: Symbol('LOWER'), | |
} | |
key; | |
callback; | |
group; | |
status = MLITask.status.INITIAL; | |
resolve; | |
reject; | |
result; | |
__priority = MLITask.priority.DEFAULT; | |
set priority( newPriority ){ | |
this.__priority = newPriority; | |
//FIXME this should also start the task if the new priority would exceed into the concurrency limit of the running priorities. | |
this.__prioritize(); | |
} | |
get priority(){ | |
return this.__priority; | |
} | |
__prioritize(){ | |
const group = this.group; | |
if( this.status === MLITask.status.QUEUED ){ | |
this.status = MLITask.status.INITIAL; | |
group.queue.splice( group.queue.indexOf( this ), 1 ); | |
} | |
if( this.status === MLITask.status.INITIAL ){ | |
// Count the current running tasks with equal or higher priority. | |
const prioritizedCount = [...group.runningTasks ] | |
.reduce(( pv, r ) => r.priority >= this.priority ? pv + 1 : pv, 0 ); | |
// No slot available for lower or equal priority, queue this. | |
if( prioritizedCount >= group.concurrentLimit ){ | |
// Insert this after one with equal priority. | |
if( group.queue.every(( q, index ) =>{ | |
if( q.priority >= this.priority ){ | |
return true; | |
} | |
group.queue.splice( index, 0, this ); | |
return false; | |
})){ | |
group.queue.push( this ); | |
} | |
this.status = MLITask.status.QUEUED; | |
console.debug(`๐๏ธ queue(${ group.queue.indexOf( this )})`, this.key, group.concurrent ); | |
} else { | |
// This priority is higher (then forced the way in) or have available running slot. | |
MLITask.addConcurrent( this ); | |
} | |
} | |
} | |
constructor( taskCallback, | |
resultKey = Symbol('resultKey'), | |
priority = MLITask.priority.DEFAULT, | |
groupKey = MLITaskGroup.DefaultGroupKey ){ | |
const group = MLITaskGroup.map( groupKey ); | |
if( resultKey ){ | |
if( group.resultMap.has( resultKey )){ | |
return group.resultMap.get( resultKey ); | |
} | |
group.resultMap.set( resultKey, this ); | |
} | |
this.group = group; | |
this.key = resultKey; | |
this.callback = taskCallback; | |
if( priority === MLITask.priority.HIGHER ){ | |
priority = 1 + [...group.runningTasks ] | |
.reduce(( pv, r ) => Math.max( pv, r.priority === Infinity ? 0 : r.priority ), 0 ); | |
console.debug(`๐๏ธ [${ priority } :HIGHER](${ resultKey })` ); | |
} else if( priority === MLITask.priority.LOWER ){ | |
priority = [...group.runningTasks ] | |
.reduce(( pv, r ) => Math.min( pv, r.priority ), 0 ) - 1; | |
priority = group.queue | |
.reduce(( pv, r ) => Math.min( pv, r.priority ), priority ) - 1; | |
console.debug(`๐๏ธ [${ priority } :LOWER](${ resultKey })` ); | |
} else { | |
console.debug(`๐๏ธ [${ priority } :PRIORITY] (${ resultKey })` ); | |
} | |
this.priority = priority; | |
this.result = new Promise(( resolveResult, rejectResult ) =>{ | |
this.resolve = resolveResult; | |
this.reject = rejectResult; | |
}); | |
} | |
static async addConcurrent( currentTask ){ | |
const group = currentTask.group; | |
group.concurrent++; | |
do { | |
console.debug(`๐๏ธ [${ group.concurrent } :CON] >>> run(${ currentTask.key })`); | |
try { | |
group.runningTasks.add( currentTask ); | |
currentTask.status = MLITask.status.STARTED; | |
const result = await new Promise( currentTask.callback ); | |
currentTask.status = MLITask.status.RESOLVED; | |
currentTask.resolve( result ); | |
} catch( err ){ | |
currentTask.status = MLITask.status.REJECTED; | |
currentTask.reject( err ); | |
} finally { | |
console.debug(`๐๏ธ <<< finished(${ currentTask.key })`); | |
group.runningTasks.delete( currentTask ); | |
} | |
} while (( currentTask = group.queue.shift())); | |
group.concurrent--; | |
console.debug(`๐๏ธ [${ group.concurrent } :CON]`); | |
} | |
/**** PROGRESS ****/ | |
__progressPromise = null; | |
__progressResolve; | |
//__progressReject; | |
__makeAPromise(){ | |
this.__progressPromise = new Promise((res,rej) =>{ | |
this.__progressResolve = res; | |
}); | |
} | |
get progress(){ | |
if( !this.__progressPromise ){ | |
this.__makeAPromise(); | |
} | |
return this.__progressPromise; | |
} | |
// Callback will set task.progress, if they want to. | |
set progress( newProgress ){ | |
if( this.__progressPromise ){ | |
this.__progressResolve( newProgress ); | |
if( newProgress < 1 ){ | |
this.__makeAPromise(); | |
} | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
export class MLITaskGroup /* FIXME extends MLITask */ { | |
static DefaultLimit = 5; | |
static DefaultGroupKey = Symbol('default'); | |
static allGroups = new Map(); | |
static map( key, limit ){ | |
return this.allGroups.get( key ) ?? new MLITaskGroup( key, limit ); | |
} | |
resultMap = new Map(); | |
runningTasks = new Set(); | |
/* TODO Global task queue & group priority. */ | |
queue = []; | |
concurrentLimit; | |
concurrent = 0; | |
key; | |
constructor( key = MLITaskGroup.DefaultGroupKey, concurrentLimit = MLITaskGroup.DefaultLimit ){ | |
if( MLITaskGroup.allGroups.has( key )){ | |
const group = MLITaskGroup.allGroups.get( key ); | |
group.concurrentLimit = concurrentLimit; | |
return group; | |
} | |
this.key = key; | |
this.concurrentLimit = concurrentLimit; | |
MLITaskGroup.allGroups.set( key, this ); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment