Skip to content

Instantly share code, notes, and snippets.

@SandeepThomas
Created November 15, 2018 11:04
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save SandeepThomas/8b37f937b5150c9056b29d44143cf29b to your computer and use it in GitHub Desktop.
Save SandeepThomas/8b37f937b5150c9056b29d44143cf29b to your computer and use it in GitHub Desktop.
Angular RxJs Websocket
import { Injectable } from '@angular/core';
import { fromEvent, NextObserver, Observable, Subject, Subscription, timer } from 'rxjs';
import { _throw } from 'rxjs/observable/throw';
import { finalize, mergeMap, retryWhen, take } from 'rxjs/operators';
import { WebSocketSubject } from 'rxjs/webSocket';
import { environment } from '../../../environments/environment';
import { AuthService } from './authentication.service';
@Injectable()
export class SocketService {
private ws$: WebSocketSubject<any>;
private _socket$: Subscription;
private _socketMsg$: Subject<any> = new Subject<any>();
constructor(
private authService: AuthService
) { }
// Custom retry with increased duration and auth check
private customRetryStrategy = ({
scalingDuration = 500,
maxDuration = 30000
}: {
scalingDuration?: number,
maxDuration?: number
} = {}) => (attempts: Observable<any>) => {
return attempts.pipe(
mergeMap((error, i) => {
const retryAttempt = i + 1;
// if user is not logged in, throw error
if (this.authService && !this.authService.isAuthenticated()) {
return _throw(error);
}
if (window.navigator.onLine) {
console.debug(`[Websocket] Attempt ${retryAttempt}: retrying in ${retryAttempt *
scalingDuration}ms`);
// retry after 0.5s, 1s, 1.5s, etc till 30s...
const duration = Math.min(retryAttempt * scalingDuration, maxDuration);
return timer(duration);
} else {
return fromEvent(window, 'online').pipe(take(1));
}
}),
finalize(() => console.debug('[Websocket] Retry Done!'))
);
};
// Connect Web Socket
connectSocket(): void {
const isAuth = this.authService.isAuthenticated();
const closed = !this._socket$ || this._socket$.closed;
if (isAuth && closed) {
const url = environment.WS_URL + '/';
const openObserver: NextObserver<Event> = {
next: () => console.debug('[Websocket] Connected')
};
const closeObserver: NextObserver<CloseEvent> = {
next: () => console.debug('[Websocket] Disconnected')
};
this.ws$ = new WebSocketSubject({
url,
openObserver,
closeObserver
});
this._socket$ = this.ws$.pipe(retryWhen(this.customRetryStrategy()))
.subscribe(res => {
this.filterByType(res);
}, (error) => console.debug(error),
() => console.debug('[Websocket] Completed!'));
}
}
// Disconnect Web Socket
disconnectSocket(): void {
if (!this._socket$) return;
this._socket$.unsubscribe();
}
getAllUpdates(): Observable<any> {
return this._socketMsg$.asObservable();
}
private filterByType(data: any) {
const notification = JSON.parse(data.message);
const msg = notification.ENGL;
console.debug('[Websocket] ', msg);
this._socketMsg$.next(data);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment