Skip to content

Instantly share code, notes, and snippets.

@siddharthpal
Last active November 9, 2019 14:33
Show Gist options
  • Save siddharthpal/4b2fe932c6e41dd84bd1d489d8733ba0 to your computer and use it in GitHub Desktop.
Save siddharthpal/4b2fe932c6e41dd84bd1d489d8733ba0 to your computer and use it in GitHub Desktop.
Polling multiple resources with exponential back-off strategy and 3 times retry for failed http requests. #pool-polling #rxjs6 #reactive-programming
import { timer as observableTimer, Subscription, interval, of, concat, SchedulerLike, asyncScheduler, Observable } from 'rxjs';
import { takeWhile, tap, take, switchMap, repeat, retryWhen, scan, mapTo, expand, exhaustMap } from 'rxjs/operators';
import { Injectable, Inject } from '@angular/core';
import { HttpClient } from '@angular/common/http';
export interface IntervalBackoffConfig {
initialInterval: number;
maxInterval?: number;
backoffDelay?: (iteration: number, initialInterval: number) => number;
}
export interface PollConfig {
interval?: number;
attempts?: number;
delay?: number;
exponentialUnit?: number;
maxBackoffTime?: number;
}
@Injectable()
export class PollingService {
private pollSubscriptions: Subscription;
private defaultConfig: Partial<PollConfig> = {
interval: 1000,
attempts: 3,
exponentialUnit: 1000,
delay: 3000,
maxBackoffTime: 10000
};
constructor(
private http: HttpClient,
) {
this.pollSubscriptions = new Subscription();
}
pollRequest<T>(
url: string,
updateStatus: any,
pollWhileCondition: Function,
pollErrorCallback?: Function,
onPollingSuccessCallback?: Function,
userConfig?: PollConfig
) {
const options = Object.assign({}, this.defaultConfig, userConfig);
if (this.pollSubscriptions.closed) {
this.pollSubscriptions = new Subscription();
}
const request$ = this.http.get<T>(url);
const firstRequest$ = request$;
const polling$ = interval(options.interval).pipe(
take(1),
switchMap(() => this.intervalBackoff({
initialInterval: options.exponentialUnit,
maxInterval: options.maxBackoffTime
})),
exhaustMap(() => request$),
repeat()
);
this.pollSubscriptions.add(concat(firstRequest$, polling$).pipe(
retryWhen(errors$ => {
return errors$.pipe(
scan(
({ errorCount, error }, err) => {
return { errorCount: errorCount + 1, error: err };
},
{ errorCount: 0, error: null }
),
switchMap(({ errorCount, error }) => {
if (errorCount >= options.attempts) {
throw error;
}
return observableTimer(options.delay, null);
})
);
}),
).pipe(tap(updateStatus), takeWhile(data => pollWhileCondition(data))).subscribe());
}
stopPolling(): void {
this.pollSubscriptions.unsubscribe();
}
private getDelay(backoffDelay: number, maxInterval: number) {
return Math.min(backoffDelay, maxInterval);
}
private exponentialBackoffDelay(iteration: number, initialInterval: number) {
return Math.pow(2, iteration) * initialInterval;
}
private intervalBackoff(
config: number | IntervalBackoffConfig,
scheduler: SchedulerLike = asyncScheduler,
): Observable<number> {
let {
initialInterval,
maxInterval = 10000,
backoffDelay = this.exponentialBackoffDelay
} = typeof config === 'number' ? { initialInterval: config } : config;
initialInterval = (initialInterval < 0) ? 0 : initialInterval;
return of(0, scheduler).pipe(
expand(iteration =>
observableTimer(this.getDelay(backoffDelay(iteration, initialInterval), maxInterval))
.pipe(mapTo(iteration + 1))
)
);
}
}
/* ==================================== e.g to use the above code ========================== */
/ * Step 1 : Please inject the above service file in your component */
constructor(
private pollingSvc: PollingService,
) {
}
/ * Step 2 : Define poll while condition and status update methods */
updateStatus(pollStatus: PollStatus) {
this.emitterSub.next(pollStatus);
}
pollWhileCondition(pollStatus: PollStatus) {
return pollStatus.executionStatus === 'running'; //If poll url status isstill running then our polling should continue
}
/ * Step 3: Call the PollService pollRequest method and you can see the magic */
this.pollingSvc.pollRequest('http://poll.url',
this.updateStatus.bind(this),
this.pollWhileCondition.bind(this));
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment