-
-
Save Jonathan-Law/35e2cf036788e4e70bb509c4c676d0c4 to your computer and use it in GitHub Desktop.
A class that manages the logic of batching items together within a timeframe and then passing those items to a function. Sandbox: https://codesandbox.io/s/v86lv7k89l
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
type Resolve = <T>(value?: T | PromiseLike<T>) => Promise<T>; | |
type Reject = <T>(reason?: any) => Promise<T>; | |
/** | |
* Deferred | |
* | |
* A deferred promise class used to create delayed promise resolutions | |
*/ | |
export class Deferred<T> { | |
promise: Promise<T>; | |
/** | |
* Creates a new rejected promise for the provided reason. | |
* @param reason The reason the promise was rejected. | |
* @returns A new rejected Promise. | |
*/ | |
reject: Reject | |
/** | |
* Creates a new resolved promise for the provided value. | |
* @param value A promise. | |
* @returns A promise whose internal state matches the provided promise. | |
*/ | |
resolve: Resolve; | |
/** | |
* Attaches callbacks for the resolution and/or rejection of the Promise. | |
* @param onfulfilled The callback to execute when the Promise is resolved. | |
* @param onrejected The callback to execute when the Promise is rejected. | |
* @returns A Promise for the completion of which ever callback is executed. | |
*/ | |
then: <TResult1 = T, TResult2 = never>(onfulfilled?: ((value: T) => TResult1 | PromiseLike<TResult1>) | undefined | null, onrejected?: ((reason: any) => TResult2 | PromiseLike<TResult2>) | undefined | null) => Promise<TResult1 | TResult2>; | |
/** | |
* Attaches a callback for only the rejection of the Promise. | |
* @param onrejected The callback to execute when the Promise is rejected. | |
* @returns A Promise for the completion of the callback. | |
*/ | |
catch: <TResult = never>(onrejected?: ((reason: any) => TResult | PromiseLike<TResult>) | undefined | null) => Promise<T | TResult>; | |
constructor() { | |
this.promise = new Promise((resolve, reject) => { | |
this.resolve = resolve as Resolve; | |
this.reject = reject as Reject; | |
}); | |
this.then = this.promise.then.bind(this.promise); | |
this.catch = this.promise.catch.bind(this.promise); | |
} | |
toPromise(): Promise<T> { | |
return this.promise; | |
} | |
} | |
// takes a flat array and returns a nested one, eg. chunkArray([1,2,3], 1) ==> [[1],[2],[3]] | |
function chunkArray(arr: any[], chunkSize: number) { | |
return arr.reduce( | |
(aggr, item) => { | |
let lastArr = aggr[aggr.length - 1]; | |
if (lastArr.length < chunkSize) { | |
lastArr.push(item); | |
} else { | |
aggr.push([item]); | |
} | |
return aggr; | |
}, | |
[[]] | |
); | |
} | |
interface BatchItem<ItemType = string | number, OptionsType = {}> { | |
item: ItemType; | |
deferred: Deferred<ItemType>; | |
options: OptionsType; | |
} | |
export class Batch<ItemType> { | |
private queue = []; | |
private timerId = null; | |
constructor( | |
private processBatch: (batch: BatchItem<ItemType>[]) => Promise<any>, | |
private options: { wait: number; batchSize: number; concurrent: boolean } | |
) {} | |
add(item: ItemType, options: object = {}) { | |
let deferred = new Deferred<ItemType>(); | |
this.queue.push({ item, deferred, options }); | |
this.scheduleRun(); | |
return deferred.toPromise(); | |
} | |
delete(item: ItemType) { | |
let toRemoveIndex = this.queue.findIndex(el => el.item === item); | |
return this.queue.splice(toRemoveIndex, 1); | |
} | |
private scheduleRun() { | |
if (!this.timerId) { | |
this.timerId = setTimeout(() => { | |
this.run(); | |
this.timerId = null; | |
}, this.options.wait); | |
} | |
} | |
private run() { | |
let doNext = () => { | |
if (this.queue.length > 0) { | |
this.scheduleRun(); | |
} | |
}; | |
if (this.options.concurrent) { | |
let batches = chunkArray(this.queue, this.options.batchSize); | |
this.queue = []; | |
return Promise.all( | |
batches.map(batch => { | |
this.processBatch(batch) | |
.then(doNext) | |
.catch(err => { | |
err && console.error(err); | |
doNext(); | |
}); | |
}) | |
); | |
} else { | |
let batch = this.queue.splice(0, this.options.batchSize); | |
return this.processBatch(batch) | |
.then(doNext) | |
.catch(err => { | |
err && console.error(err); | |
doNext(); | |
}); | |
} | |
} | |
} | |
// test it out | |
let batch = new Batch( | |
tasks => { | |
console.log(tasks); | |
return Promise.all( | |
tasks.map(({ item, deferred }) => { | |
deferred.resolve(`${item} DONE!`); | |
return deferred; | |
}) | |
); | |
}, | |
{ wait: 60, batchSize: 3, concurrent: true } | |
); | |
batch.add(1).then(console.log); | |
batch.add(2).then(console.log); | |
batch.add(3).then(console.log); | |
batch.add(4).then(console.log); | |
batch.delete(4); | |
batch.add(5).then(console.log); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment