Skip to content

Instantly share code, notes, and snippets.

@witalobenicio
Last active July 30, 2021 10:14
Show Gist options
  • Save witalobenicio/42c1dfe05a1fd89a2b1ffbc528a90978 to your computer and use it in GitHub Desktop.
Save witalobenicio/42c1dfe05a1fd89a2b1ffbc528a90978 to your computer and use it in GitHub Desktop.
import { Observable } from 'rxjs';
import { retry } from 'rxjs/operators';
import Processor from './Processor';
import { Task } from '@models/Task';
const BATCH = 20;
export default class ToDoProcessorQueue extends Processor {
public static getInstance(): ToDoProcessorQueue {
if (!ToDoProcessorQueue.processor) {
ToDoProcessorQueue.processor = new ToDoProcessorQueue();
}
return ToDoProcessorQueue.processor;
}
private static processor: ToDoProcessorQueue;
private batch = BATCH;
private constructor() {
super(BATCH);
this.buildObservable(this.toDoDataObservable);
}
protected getBatch(): number {
return this.batch;
}
protected getDelay(): number {
return 1;
}
private async getExecutionFromTask(task: Task): Promise<DataCallback> {
const startTime = new Date().getTime();
let data;
// PROCESS YOUR DATA BASED ON TASK TYPE
return data;
}
private toDoDataObservable(task: Task): Observable<DataCallback> {
return new Observable<DataCallback>(observer => {
this.getExecutionFromTask(task)
.then(data => {
observer.next(data);
observer.complete();
})
.catch(error => {
observer.error({ error, task });
});
}).pipe(retry(2));
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment