Skip to content

Instantly share code, notes, and snippets.

@merqlove
Last active March 6, 2017 14:09
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save merqlove/301dc37f53d4ee13cabbd81b7dcda1e3 to your computer and use it in GitHub Desktop.
Save merqlove/301dc37f53d4ee13cabbd81b7dcda1e3 to your computer and use it in GitHub Desktop.
Phoenix provider with Angular2
import {Injectable, OpaqueToken, Inject} from '@angular/core';
import * as PhoenixBase from 'phoenix/web/static/js/phoenix.js';
import {Channel, Socket} from 'phoenix/web/static/js/phoenix.js';
import {Observable} from 'rxjs/Observable';
import {isFunction} from 'lodash';
import {Observer} from 'rxjs/Observer';
import {LoggerService} from './';
export declare class Timer {
callback: any;
timerCalc: any;
timer: any;
tries: number;
reset: () => void;
scheduleTimeout: () => void;
}
export interface ISocket {
PhoenixBase: any;
protocol(): string;
endPointURL(): string;
disconnect(callback?: Function, code?: string, reason?: any): void;
connect(params?: any): void;
log(kind: string, msg: string, data: any): void;
onOpen(callback: Function): void;
onClose(callback: Function): void;
onError(callback: Function): void;
onMessage(callback: Function): void;
onConnOpen(): void;
onConnClose(event: any): void;
onConnError(error: any): void;
triggerChanError(): void;
connectionState(): string;
isConnected(): boolean;
remove(channel: IChannel): void;
channel(topic: string, chanParams?: Object): IChannel;
push(data: any): void;
makeRef(): string;
sendHeartbeat(): void;
flushSendBuffer(): void;
onConnMessage(rawMessage: any): void;
}
export interface IPush {
observable: Observable<any>;
resend(timeout: number): void;
send(): void;
receive(status: string, callback: (response?: any) => void): IPush;
}
export interface IChannel {
state: string;
topic: string;
params: any;
socket: ISocket;
timeout: any;
joinedOnce: boolean;
joinPush: IPush;
pushBuffer: any;
rejoinTimer: Timer;
rejoinUntilConnected(): void;
join(timeout?: number, scope?: () => any): IPush;
init<T>(timeout?: number, scope?: () => any): IPush;
leave(timeout?: number): IPush;
onClose(callback: Function): void;
onError(callback: (reason?: any) => void): void;
onMessage(event: string, payload: any, ref: any): any;
on(event: string, callback: (response?: any) => void): void;
subscribe<T>(event: string, callback: (response?: any) => void): Observable<any>;
off(event: string): void;
canPush(): boolean;
push(event: string, payload: Object, timeout?: number): IPush;
}
const clone: any = (obj: any) => {
if (undefined === obj || 'object' !== typeof obj) {
return obj;
}
const copy: any = obj.constructor();
for (const attr in obj) {
if (obj.hasOwnProperty(attr)) {
copy[attr] = obj[attr];
}
}
return copy;
};
export const PHOENIX_OPTIONS: OpaqueToken = new OpaqueToken('PhoenixOptions');
export interface IPhoenixOptions {
urlBase: string;
autoJoinSocket: boolean;
defaults: any;
socket: ISocket;
loggerHandler: any;
}
const DEFAULT_OPTIONS: IPhoenixOptions = {
urlBase: '/ws',
autoJoinSocket: true,
defaults: {},
socket: undefined,
loggerHandler: () => {
// some
},
};
@Injectable()
export class PhoenixProvider {
_urlBase: string;
_autoJoinSocket: boolean;
_defaults: any;
_socket: ISocket;
_loggerHandler: any;
constructor(private _loggerService: LoggerService, @Inject(PHOENIX_OPTIONS) options?: IPhoenixOptions) {
this._loggerService.debug('PhoenixProvider init');
const {urlBase, autoJoinSocket, defaults, socket, loggerHandler} = Object.assign({}, DEFAULT_OPTIONS, options);
this._urlBase = urlBase;
this._autoJoinSocket = autoJoinSocket;
this._defaults = defaults;
this._socket = socket;
this._loggerHandler = loggerHandler;
this.patchPhoenix();
}
setup(): ISocket {
if (this.socket && this.socket.isConnected()) {
this.reinstantiate();
} else {
this.instantiate();
}
if (this.autoJoinSocket) {
this.socket.connect();
} else {
const args: ISocket[] = [this.socket];
this.socket.connect = this.socket.connect.bind(this, ...args);
}
this.socket.PhoenixBase = PhoenixBase;
return this._socket;
}
get socket(): ISocket {
return this._socket;
}
set socket(socket: ISocket) {
this._socket = socket;
}
get urlBase(): string {
return this._urlBase;
}
get defaults(): any {
return this._defaults;
}
get autoJoinSocket(): boolean {
return this._autoJoinSocket;
}
get loggerHandler(): any {
return this._loggerHandler;
}
set urlBase(url: string) {
this._urlBase = url;
}
set defaults(defaults: any) {
this._defaults = defaults;
}
set autoJoinSocket(bool: boolean) {
this._autoJoinSocket = bool;
}
set loggerHandler(handler: any) {
this._loggerHandler = handler;
}
private patchPhoenix(): void {
Channel.prototype.subscribe = (() => {
return function subscribe<T>(event: any, callback: any): Observable<T> {
const self: any = this;
const obb: Observable<T|any> = new Observable<T|any>((observer: Observer<T|any>) => {
const newCallback: any = (message: T|any) => {
if (isFunction(callback)) {
callback(message);
}
observer.next(message);
};
self.on(event, newCallback);
});
obb.subscribe({
complete: () => {
self.off(event);
},
});
return obb;
};
})();
Channel.prototype.init = (() => {
return function init<T>(scope?: any): IPush {
const res: IPush = this.join();
res.observable = new Observable<T|any>( (observer: Observer<T|any>) => {
res
.receive('ok', (resp: T|any) => {
observer.next(resp);
})
.receive('error', (resp: T|any) => {
observer.error(resp);
});
});
res.observable.subscribe({
complete: () => {
if (isFunction(scope)) {
scope();
}
},
});
return res;
};
})();
}
private instantiate(): void {
this.socket = <ISocket> (new Socket(this.urlBase, {
timeout: 5000,
params: this.defaults,
logger: this.loggerHandler,
}));
}
private reinstantiate(): boolean|void {
return this.disconnect(this.socket.connect);
}
private disconnect(cb: any): boolean|void {
if (this.socket) {
return this.socket.disconnect(cb);
}
return false;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment