Skip to content

Instantly share code, notes, and snippets.

@Jonathan-Law Jonathan-Law/Batch.ts forked from ccnokes/Batch.ts
Last active Oct 2, 2019

Embed
What would you like to do?
A class that manages the logic of batching items together within a timeframe and then passing those items to a function. Sandbox: https://codesandbox.io/s/v86lv7k89l
type Resolve = <T>(value?: T | PromiseLike<T>) => Promise<T>;
type Reject = <T>(reason?: any) => Promise<T>;
/**
* Deferred
*
* A deferred promise class used to create delayed promise resolutions
*/
export class Deferred<T> {
promise: Promise<T>;
/**
* Creates a new rejected promise for the provided reason.
* @param reason The reason the promise was rejected.
* @returns A new rejected Promise.
*/
reject: Reject
/**
* Creates a new resolved promise for the provided value.
* @param value A promise.
* @returns A promise whose internal state matches the provided promise.
*/
resolve: Resolve;
/**
* Attaches callbacks for the resolution and/or rejection of the Promise.
* @param onfulfilled The callback to execute when the Promise is resolved.
* @param onrejected The callback to execute when the Promise is rejected.
* @returns A Promise for the completion of which ever callback is executed.
*/
then: <TResult1 = T, TResult2 = never>(onfulfilled?: ((value: T) => TResult1 | PromiseLike<TResult1>) | undefined | null, onrejected?: ((reason: any) => TResult2 | PromiseLike<TResult2>) | undefined | null) => Promise<TResult1 | TResult2>;
/**
* Attaches a callback for only the rejection of the Promise.
* @param onrejected The callback to execute when the Promise is rejected.
* @returns A Promise for the completion of the callback.
*/
catch: <TResult = never>(onrejected?: ((reason: any) => TResult | PromiseLike<TResult>) | undefined | null) => Promise<T | TResult>;
constructor() {
this.promise = new Promise((resolve, reject) => {
this.resolve = resolve as Resolve;
this.reject = reject as Reject;
});
this.then = this.promise.then.bind(this.promise);
this.catch = this.promise.catch.bind(this.promise);
}
toPromise(): Promise<T> {
return this.promise;
}
}
// takes a flat array and returns a nested one, eg. chunkArray([1,2,3], 1) ==> [[1],[2],[3]]
function chunkArray(arr: any[], chunkSize: number) {
return arr.reduce(
(aggr, item) => {
let lastArr = aggr[aggr.length - 1];
if (lastArr.length < chunkSize) {
lastArr.push(item);
} else {
aggr.push([item]);
}
return aggr;
},
[[]]
);
}
interface BatchItem<ItemType = string | number, OptionsType = {}> {
item: ItemType;
deferred: Deferred<ItemType>;
options: OptionsType;
}
export class Batch<ItemType> {
private queue = [];
private timerId = null;
constructor(
private processBatch: (batch: BatchItem<ItemType>[]) => Promise<any>,
private options: { wait: number; batchSize: number; concurrent: boolean }
) {}
add(item: ItemType, options: object = {}) {
let deferred = new Deferred<ItemType>();
this.queue.push({ item, deferred, options });
this.scheduleRun();
return deferred.toPromise();
}
delete(item: ItemType) {
let toRemoveIndex = this.queue.findIndex(el => el.item === item);
return this.queue.splice(toRemoveIndex, 1);
}
private scheduleRun() {
if (!this.timerId) {
this.timerId = setTimeout(() => {
this.run();
this.timerId = null;
}, this.options.wait);
}
}
private run() {
let doNext = () => {
if (this.queue.length > 0) {
this.scheduleRun();
}
};
if (this.options.concurrent) {
let batches = chunkArray(this.queue, this.options.batchSize);
this.queue = [];
return Promise.all(
batches.map(batch => {
this.processBatch(batch)
.then(doNext)
.catch(err => {
err && console.error(err);
doNext();
});
})
);
} else {
let batch = this.queue.splice(0, this.options.batchSize);
return this.processBatch(batch)
.then(doNext)
.catch(err => {
err && console.error(err);
doNext();
});
}
}
}
// test it out
let batch = new Batch(
tasks => {
console.log(tasks);
return Promise.all(
tasks.map(({ item, deferred }) => {
deferred.resolve(`${item} DONE!`);
return deferred;
})
);
},
{ wait: 60, batchSize: 3, concurrent: true }
);
batch.add(1).then(console.log);
batch.add(2).then(console.log);
batch.add(3).then(console.log);
batch.add(4).then(console.log);
batch.delete(4);
batch.add(5).then(console.log);
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.