Skip to content

Instantly share code, notes, and snippets.

@ccnokes ccnokes/Batch.ts
Last active Apr 2, 2019

Embed
What would you like to do?
A class that manages the logic of batching items together within a timeframe and then passing those items to a function. Sandbox: https://codesandbox.io/s/v86lv7k89l
class Deferred<Type> {
promise: Promise<Type>;
// TODO type these
resolve: any;
reject: any;
then: any;
catch: any;
constructor() {
this.promise = new Promise((resolve, reject) => {
this.resolve = resolve;
this.reject = reject;
});
this.then = this.promise.then.bind(this.promise);
this.catch = this.promise.catch.bind(this.promise);
}
toPromise() {
return this.promise;
}
}
// takes a flat array and returns a nested one, eg. chunkArray([1,2,3], 1) ==> [[1],[2],[3]]
function chunkArray(arr: any[], chunkSize: number) {
return arr.reduce(
(aggr, item) => {
let lastArr = aggr[aggr.length - 1];
if (lastArr.length < chunkSize) {
lastArr.push(item);
} else {
aggr.push([item]);
}
return aggr;
},
[[]]
);
}
interface BatchItem<ItemType = string | number, OptionsType = {}> {
item: ItemType;
deferred: Deferred<ItemType>;
options: OptionsType;
}
export class Batch<ItemType> {
private queue = [];
private timerId = null;
constructor(
private processBatch: (batch: BatchItem<ItemType>[]) => Promise<any>,
private options: { wait: number; batchSize: number; concurrent: boolean }
) {}
add(item: ItemType, options: object = {}) {
let deferred = new Deferred<ItemType>();
this.queue.push({ item, deferred, options });
this.scheduleRun();
return deferred.toPromise();
}
delete(item: ItemType) {
let toRemoveIndex = this.queue.findIndex(el => el.item === item);
return this.queue.splice(toRemoveIndex, 1);
}
private scheduleRun() {
if (!this.timerId) {
this.timerId = setTimeout(() => {
this.run();
this.timerId = null;
}, this.options.wait);
}
}
private run() {
let doNext = () => {
if (this.queue.length > 0) {
this.scheduleRun();
}
};
if (this.options.concurrent) {
let batches = chunkArray(this.queue, this.options.batchSize);
this.queue = [];
return Promise.all(
batches.map(batch => {
this.processBatch(batch)
.then(doNext)
.catch(err => {
err && console.error(err);
doNext();
});
})
);
} else {
let batch = this.queue.splice(0, this.options.batchSize);
return this.processBatch(batch)
.then(doNext)
.catch(err => {
err && console.error(err);
doNext();
});
}
}
}
// test it out
let batch = new Batch(
tasks => {
console.log(tasks);
return Promise.all(
tasks.map(({ item, deferred }) => {
deferred.resolve(`${item} DONE!`);
return deferred;
})
);
},
{ wait: 60, batchSize: 3, concurrent: true }
);
batch.add(1).then(console.log);
batch.add(2).then(console.log);
batch.add(3).then(console.log);
batch.add(4).then(console.log);
batch.delete(4);
batch.add(5).then(console.log);
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.