Skip to content

Instantly share code, notes, and snippets.

@witalobenicio
Created July 27, 2021 17:40
Show Gist options
  • Save witalobenicio/f0634b0e672d105d7d9d099ec18efbfa to your computer and use it in GitHub Desktop.
Save witalobenicio/f0634b0e672d105d7d9d099ec18efbfa to your computer and use it in GitHub Desktop.
import { Observable, Subject } from 'rxjs';
import { DataCallback } from '@integrations/types';
import { Task } from '@models/Task';
export interface BatchChange {
used: number;
available: number;
}
export default abstract class Processor {
protected queue: Subject<Task>;
protected errorQueue: Subject<{ error: Error, task: Task }>;
protected pipe: Observable<DataCallback>;
protected availableBatch;
protected subscribers: ((change: BatchChange) => void)[];
protected constructor(batch: number) {
// Batch can be different depending on the type of Processor
this.availableBatch = batch;
this.errorQueue = new Subject<{ error: Error, task: Task }>();
this.queue = new Subject<Task>();
}
protected buildObservable(executor: (data: Task) => Observable<DataCallback>): void {
this.pipe = this.queue.pipe(
mergeMap(data => executor(data).pipe(
delay(this.getDelay()),
catchError(error => {
this.errorQueue.next(error);
return EMPTY;
})
), this.getBatch())
);
return this.pipe;
}
public getAvailableBatch(): number {
return this.availableBatch;
}
public subscribeToBatchChanges(subscriber: (change: BatchChange) => void): void {
if (!this.subscribers) {
this.subscribers = [];
}
subscriber({ used: this.getBatch() - this.availableBatch, available: this.availableBatch });
this.subscribers.push(subscriber);
}
public subscribe(next: (data: DataCallback) => void, error: (err: any) => void): void {
if (this.errorQueue) {
this.errorQueue.subscribe(this.batchHandler(error));
}
this.pipe.subscribe(this.batchHandler(next));
}
public addManyToQueue(tasks: Task[]): void {
tasks.forEach(task => {
this.changeAvailableBatch(-1);
this.queue.next(task);
});
}
public addToQueue(task: Task): void {
this.changeAvailableBatch(-1);
this.queue.next(task);
}
protected abstract getBatch(): number;
protected abstract getDelay(): number;
private changeAvailableBatch(quantity: -1 | 1): void {
this.availableBatch = this.availableBatch + quantity;
}
private batchHandler(func: (err: any) => void): (err: any) => void;
private batchHandler(func: (data: DataCallback) => void): (data: DataCallback) => void;
private batchHandler(func: any): any {
return (value) => {
this.changeAvailableBatch(1);
func(value);
};
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment