Created
March 6, 2019 10:41
-
-
Save bsh314/f9f938809e90a7a2f5270b11be2b952f to your computer and use it in GitHub Desktop.
Provides scheduled execution with some extras for rxjs.Observable objects
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { delay } from "rxjs/operators/delay"; | |
import { Observable } from "rxjs/Observable"; | |
import { Subject } from "rxjs/Subject"; | |
import { Subscription } from "rxjs/Subscription"; | |
import { v4 as uuid } from 'uuid'; | |
/** | |
* Default scheduler task interface | |
* | |
* @export | |
* @interface RxSchedulerTask | |
*/ | |
export interface RxSchedulerTask<T = any> { | |
id: string; | |
value: Observable<T>; | |
await: boolean; | |
subscription?: Subscription; | |
retry?: number; | |
} | |
/** | |
* This class provides configurable way to perform scheduled tasks execution | |
* | |
* @export | |
* @class RxScheduler | |
* @template T | |
*/ | |
export class RxScheduler<T = any> { | |
current: RxSchedulerTask; // active task | |
tasks: RxSchedulerTask[] = []; // pending tasks | |
finished: RxSchedulerTask[] = []; // completed tasks | |
private subject: Subject<T>; // output subject | |
private subscription: Subscription; // output subject subscription | |
constructor( | |
private storageLimit = 10, | |
private debug = false, | |
) { } | |
/** | |
* Current execution state | |
* | |
* @readonly | |
* @type {boolean} | |
* @memberof RxScheduler | |
*/ | |
public get closed(): boolean { | |
return ( | |
!this.subscription || this.subscription.closed | |
) && ( | |
!this.current || !this.current.subscription || | |
(this.current.subscription && !this.current.subscription.closed) | |
); | |
} | |
/** | |
* Add task to pending execution stack | |
* | |
* @param {Observable<any>} task | |
* @param {{ | |
* id?: string, | |
* delay?: number, | |
* retry?: number, | |
* await?: boolean, | |
* debug?: boolean | |
* }} [params] | |
* @memberof RxScheduler | |
*/ | |
public push( | |
task: Observable<any>, | |
params?: { | |
id?: string, | |
delay?: number, | |
retry?: number, | |
await?: boolean | |
} | |
) { | |
// delay observable completion if timespan is providen | |
if (params && params.delay > 0) { | |
task = task.pipe(delay(params.delay)); | |
} | |
const data = { | |
id: params && params.id ? params.id : uuid(), // generate ID if not providen | |
await: !params || params.await !== false, // wait for previous task to complete by default | |
retry: params && params.retry > 0 ? params.retry : 0, // on error do not retry execution by default | |
value: task // default task observable | |
}; | |
this.tasks.push(data); | |
if (this.debug) { | |
console.groupCollapsed('RxScherduler:push:DEBUG: {' + data.id + '} task added [' + this.tasks.length + ']'); | |
console.table(this.tasks); | |
console.groupEnd(); | |
} | |
// start execution if previous task was already completed | |
if (!this.current) { | |
this.next(); | |
} | |
} | |
/** | |
* Remove task from pending execution stack | |
* | |
* @param {string} id | |
* @returns {RxSchedulerTask} | |
* @memberof RxScheduler | |
*/ | |
public pull(id: string): RxSchedulerTask { | |
if (this.current && this.current.id === id) { | |
if (this.current.subscription) { | |
this.current.subscription.unsubscribe(); | |
} | |
const task = this.current; | |
this.current = undefined; | |
return task; | |
} | |
const index = this.tasks.findIndex(t => t.id === id); | |
if (index > -1) { | |
return this.tasks.splice(index, 1)[0]; | |
} | |
return null; | |
} | |
/** | |
* Start execution | |
* | |
* @param {(value: T) => void} [next] | |
* @param {(error: any) => void} [error] | |
* @param {() => void} [complete] | |
* @returns {Subscription} | |
* @memberof RxScheduler | |
*/ | |
public subscribe( | |
next?: (value: T) => void, | |
error?: (error: any) => void, | |
complete?: () => void | |
): Subscription { | |
if (!this.subject) { | |
this.subject = new Subject<T>(); | |
} | |
// get already existent subscription if awailable | |
if (this.subscription && !this.subscription.closed) { | |
return this.subscription; | |
} | |
this.next(); | |
return this.subscription = this.subject.subscribe(next, error, complete); | |
} | |
/** | |
* Stop execution | |
* | |
* @memberof RxScheduler | |
*/ | |
public unsubscribe(): void { | |
if (this.subscription && !this.subscription.closed) { | |
this.subscription.unsubscribe(); | |
} | |
if (this.current && this.current.subscription && !this.current.subscription.closed) { | |
this.current.subscription.unsubscribe(); | |
} | |
this.current = undefined; | |
this.tasks = []; | |
} | |
/** | |
* Proceed to next task & store previous one if awailable | |
* | |
* @private | |
* @returns {RxSchedulerTask} | |
* @memberof RxScheduler | |
*/ | |
private next(): RxSchedulerTask { | |
if (this.current && this.current.subscription && !this.current.subscription.closed && this.current.await === true) { | |
this.current.subscription.unsubscribe(); | |
} | |
this.store(); | |
if (this.tasks && this.tasks.length > 0) { | |
this.current = this.tasks.shift(); | |
if (this.debug) { | |
console.log('RxScherduler:next:DEBUG: next task {' + this.current.id + '}'); | |
} | |
} | |
if (this.current) { | |
this.execute(this.current); | |
} | |
return this.current; | |
} | |
/** | |
* Store finished task in memory cache. | |
* Size of chache is defined by constructor parameter "storageLimit". | |
* | |
* @private | |
* @returns {RxSchedulerTask} | |
* @memberof RxScheduler | |
*/ | |
private store(): RxSchedulerTask { | |
if (!this.current) { | |
if (this.debug) { | |
console.info('RxScherduler:store:DEBUG: no previous task found.'); | |
} | |
return undefined; | |
} | |
if (this.finished.length + 1 > this.storageLimit) { | |
this.finished.pop(); | |
} | |
const task = this.current | |
this.finished.unshift(task); | |
this.current = undefined; | |
if (task && this.debug) { | |
console.groupCollapsed('RxScherduler:store:DEBUG: {' + task.id + '} task stored [' + this.finished.length + ']'); | |
console.table(this.finished); | |
console.groupEnd(); | |
} | |
return task; | |
} | |
/** | |
* Retry the task execution if config is providen | |
* | |
* @private | |
* @returns {RxSchedulerTask} | |
* @memberof RxScheduler | |
*/ | |
private retry(): RxSchedulerTask { | |
if (!this.current) { | |
throw new Error('RxScherduler:retry:ERROR: No awailable task specified!!!'); | |
} | |
if (this.current && this.current.retry > 0) { | |
this.current.retry = this.current.retry - 1; // reduce opportunities | |
this.execute(this.current); | |
} | |
return this.current; | |
} | |
/** | |
* Run run task and wait to it's completion or execute it on background | |
* | |
* @private | |
* @param {RxSchedulerTask} task | |
* @returns {*} | |
* @memberof RxScheduler | |
*/ | |
private execute(task: RxSchedulerTask): any { | |
if (!task) { | |
throw new Error('RxScherduler:execute:ERROR: No awailable task specified!!!'); | |
} | |
if (task && task.subscription && !task.subscription.closed) { | |
console.info('RxScherduler:execute:WARN: Task {' + task.id + '} is already running!!!'); | |
} | |
// this will pick only first response of the observable | |
task.subscription = task.value.take(1).subscribe( | |
response => { | |
if (this.debug) { | |
console.groupCollapsed('RxScherduler:execute:DEBUG: Task {' + task.id + '} completed'); | |
console.log('timestamp', new Date().getTime()); | |
console.log('response', response); | |
console.groupEnd(); | |
} | |
this.subject.next(response); | |
// do not force next task when completed if task was defined with "await = false" | |
if (task.await) { | |
this.next(); | |
} | |
}, | |
error => { | |
if (this.debug) { | |
console.error('RxScherduler:execute:DEBUG: Task {' + task.id + '} completed with errors'); | |
console.log('timestamp', new Date().getTime()); | |
console.error('error', error); | |
console.groupEnd(); | |
} | |
this.subject.error(error); | |
if (task.await) { | |
// Retry the operation if there are more specified intents | |
if (task.retry > 0) { | |
this.retry(); | |
} else { | |
this.next(); | |
} | |
} | |
} | |
); | |
// force next task if it was defined with "await = false" | |
if (!task.await) { | |
this.next(); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment