Skip to content

Instantly share code, notes, and snippets.

@bsh314
Created March 6, 2019 10:41
Show Gist options
  • Save bsh314/f9f938809e90a7a2f5270b11be2b952f to your computer and use it in GitHub Desktop.
Save bsh314/f9f938809e90a7a2f5270b11be2b952f to your computer and use it in GitHub Desktop.
Provides scheduled execution with some extras for rxjs.Observable objects
import { delay } from "rxjs/operators/delay";
import { Observable } from "rxjs/Observable";
import { Subject } from "rxjs/Subject";
import { Subscription } from "rxjs/Subscription";
import { v4 as uuid } from 'uuid';
/**
* Default scheduler task interface
*
* @export
* @interface RxSchedulerTask
*/
export interface RxSchedulerTask<T = any> {
id: string;
value: Observable<T>;
await: boolean;
subscription?: Subscription;
retry?: number;
}
/**
* This class provides configurable way to perform scheduled tasks execution
*
* @export
* @class RxScheduler
* @template T
*/
export class RxScheduler<T = any> {
current: RxSchedulerTask; // active task
tasks: RxSchedulerTask[] = []; // pending tasks
finished: RxSchedulerTask[] = []; // completed tasks
private subject: Subject<T>; // output subject
private subscription: Subscription; // output subject subscription
constructor(
private storageLimit = 10,
private debug = false,
) { }
/**
* Current execution state
*
* @readonly
* @type {boolean}
* @memberof RxScheduler
*/
public get closed(): boolean {
return (
!this.subscription || this.subscription.closed
) && (
!this.current || !this.current.subscription ||
(this.current.subscription && !this.current.subscription.closed)
);
}
/**
* Add task to pending execution stack
*
* @param {Observable<any>} task
* @param {{
* id?: string,
* delay?: number,
* retry?: number,
* await?: boolean,
* debug?: boolean
* }} [params]
* @memberof RxScheduler
*/
public push(
task: Observable<any>,
params?: {
id?: string,
delay?: number,
retry?: number,
await?: boolean
}
) {
// delay observable completion if timespan is providen
if (params && params.delay > 0) {
task = task.pipe(delay(params.delay));
}
const data = {
id: params && params.id ? params.id : uuid(), // generate ID if not providen
await: !params || params.await !== false, // wait for previous task to complete by default
retry: params && params.retry > 0 ? params.retry : 0, // on error do not retry execution by default
value: task // default task observable
};
this.tasks.push(data);
if (this.debug) {
console.groupCollapsed('RxScherduler:push:DEBUG: {' + data.id + '} task added [' + this.tasks.length + ']');
console.table(this.tasks);
console.groupEnd();
}
// start execution if previous task was already completed
if (!this.current) {
this.next();
}
}
/**
* Remove task from pending execution stack
*
* @param {string} id
* @returns {RxSchedulerTask}
* @memberof RxScheduler
*/
public pull(id: string): RxSchedulerTask {
if (this.current && this.current.id === id) {
if (this.current.subscription) {
this.current.subscription.unsubscribe();
}
const task = this.current;
this.current = undefined;
return task;
}
const index = this.tasks.findIndex(t => t.id === id);
if (index > -1) {
return this.tasks.splice(index, 1)[0];
}
return null;
}
/**
* Start execution
*
* @param {(value: T) => void} [next]
* @param {(error: any) => void} [error]
* @param {() => void} [complete]
* @returns {Subscription}
* @memberof RxScheduler
*/
public subscribe(
next?: (value: T) => void,
error?: (error: any) => void,
complete?: () => void
): Subscription {
if (!this.subject) {
this.subject = new Subject<T>();
}
// get already existent subscription if awailable
if (this.subscription && !this.subscription.closed) {
return this.subscription;
}
this.next();
return this.subscription = this.subject.subscribe(next, error, complete);
}
/**
* Stop execution
*
* @memberof RxScheduler
*/
public unsubscribe(): void {
if (this.subscription && !this.subscription.closed) {
this.subscription.unsubscribe();
}
if (this.current && this.current.subscription && !this.current.subscription.closed) {
this.current.subscription.unsubscribe();
}
this.current = undefined;
this.tasks = [];
}
/**
* Proceed to next task & store previous one if awailable
*
* @private
* @returns {RxSchedulerTask}
* @memberof RxScheduler
*/
private next(): RxSchedulerTask {
if (this.current && this.current.subscription && !this.current.subscription.closed && this.current.await === true) {
this.current.subscription.unsubscribe();
}
this.store();
if (this.tasks && this.tasks.length > 0) {
this.current = this.tasks.shift();
if (this.debug) {
console.log('RxScherduler:next:DEBUG: next task {' + this.current.id + '}');
}
}
if (this.current) {
this.execute(this.current);
}
return this.current;
}
/**
* Store finished task in memory cache.
* Size of chache is defined by constructor parameter "storageLimit".
*
* @private
* @returns {RxSchedulerTask}
* @memberof RxScheduler
*/
private store(): RxSchedulerTask {
if (!this.current) {
if (this.debug) {
console.info('RxScherduler:store:DEBUG: no previous task found.');
}
return undefined;
}
if (this.finished.length + 1 > this.storageLimit) {
this.finished.pop();
}
const task = this.current
this.finished.unshift(task);
this.current = undefined;
if (task && this.debug) {
console.groupCollapsed('RxScherduler:store:DEBUG: {' + task.id + '} task stored [' + this.finished.length + ']');
console.table(this.finished);
console.groupEnd();
}
return task;
}
/**
* Retry the task execution if config is providen
*
* @private
* @returns {RxSchedulerTask}
* @memberof RxScheduler
*/
private retry(): RxSchedulerTask {
if (!this.current) {
throw new Error('RxScherduler:retry:ERROR: No awailable task specified!!!');
}
if (this.current && this.current.retry > 0) {
this.current.retry = this.current.retry - 1; // reduce opportunities
this.execute(this.current);
}
return this.current;
}
/**
* Run run task and wait to it's completion or execute it on background
*
* @private
* @param {RxSchedulerTask} task
* @returns {*}
* @memberof RxScheduler
*/
private execute(task: RxSchedulerTask): any {
if (!task) {
throw new Error('RxScherduler:execute:ERROR: No awailable task specified!!!');
}
if (task && task.subscription && !task.subscription.closed) {
console.info('RxScherduler:execute:WARN: Task {' + task.id + '} is already running!!!');
}
// this will pick only first response of the observable
task.subscription = task.value.take(1).subscribe(
response => {
if (this.debug) {
console.groupCollapsed('RxScherduler:execute:DEBUG: Task {' + task.id + '} completed');
console.log('timestamp', new Date().getTime());
console.log('response', response);
console.groupEnd();
}
this.subject.next(response);
// do not force next task when completed if task was defined with "await = false"
if (task.await) {
this.next();
}
},
error => {
if (this.debug) {
console.error('RxScherduler:execute:DEBUG: Task {' + task.id + '} completed with errors');
console.log('timestamp', new Date().getTime());
console.error('error', error);
console.groupEnd();
}
this.subject.error(error);
if (task.await) {
// Retry the operation if there are more specified intents
if (task.retry > 0) {
this.retry();
} else {
this.next();
}
}
}
);
// force next task if it was defined with "await = false"
if (!task.await) {
this.next();
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment