Last active
October 30, 2019 17:25
-
-
Save EliAndrewC/05ebf79f520a1ab98c621c8c278ad30f to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* This modules exports an Angular 2 WebSocketService implementing Sideboard's | |
* WebSocket RPC protocol. There are three main actions exposed to consumers: | |
* calling RPC methods, making subscriptions, and canceling subscriptions. | |
* | |
* | |
* Calling RPC Methods | |
* ------------------- | |
* Consumers can make one-off RPC calls which return promises: | |
* | |
* this.ws.call('foo.bar', 'baz', 'baf').then(onSuccess).catch(onError); | |
* | |
* This translates into an RPC request like this: | |
* | |
* {"method": "foo.bar", "params": ["baz", "baf"], "callback": "callback-42"} | |
* | |
* The promise is resolved when we get a non-error response back from the server | |
* and rejected when we get an error response or time out. Any response object | |
* with an "error" field is treated as an error. | |
* | |
* Consumers can also use the long-form approach to making calls by passing in | |
* a CallConfig object instead of a method name and list of parameters: | |
* | |
* this.ws.call({ | |
* method: "foo.bar", | |
* params: ["baz", "baf"] | |
* }).then(onSuccess).catch(onError); | |
* | |
* This would generate an identical RPC request to the previous example, but is | |
* more flexible in that it allows you to use keyword arguments instead of | |
* positional arguments, as well as specify your own custom timeout value. | |
* | |
* | |
* Making Subscriptions | |
* -------------------- | |
* Consumers can make "subscription" requests to the server, which returns a | |
* value immediately and then also returns an updated value every time it | |
* changes. These subscription calls return a WebSocketObservable object, which | |
* is an Observable with an additional subscriptionId field which identifies it. | |
* | |
* For example, if we were implementing the Angular 2 Tour of Heroes tutorial | |
* using WebSocketService, we might do something like this: | |
* | |
* @Injectable() | |
* export class HeroService { | |
* heroes: WebSocketObservable<any>; | |
* constructor (private ws: WebSocketService) { | |
* this.heroes = this.ws.subscribe({method: 'heroes.get_all_heroes'}); | |
* } | |
* } | |
* | |
* Then other classes could use the standard Observable.subscribe() method to | |
* get the latest data which was returned from the server, e.g. | |
* | |
* this.heroService.heroes.subscribe(processDataCallback, handleErrorCallback); | |
* | |
* WebSocketObservables can also be updated. For example, if our heroes service | |
* had paging, instead of canceling our subscription and creating a new one, the | |
* HeroService could update its subscription like this: | |
* | |
* this.ws.subscribe({ | |
* subscriptionId: this.heroes.subscriptionId, | |
* method: 'heroes.get_all_heroes', | |
* params: {page: 2} | |
* }); | |
* | |
* All of the observers subscribed to the existing WebSocketObservable instance | |
* would have the updated value published to them without needing to | |
* re-subscribe. In addition, making multiple calls to | |
* WebSocketService.subscribe() with the same subscriptionId always returns the | |
* same WebSocketObservable instance. | |
* | |
* An error return value from the server will emit an error to the observable, | |
* which will stop future values from being published. If you want to resume, | |
* you'll need unsubscribe (see below) and re-subscribe with valid parameters. | |
* As with calls, a return value object with an "error" field is treated as an | |
* error response. | |
* | |
* | |
* Canceling Subscriptions | |
* ----------------------- | |
* If you've made a subscription and no longer want to receive updates, you can | |
* cancel it by passing the subscriptionId to the unsubscribe method, e.g. | |
* HeroService could do this: | |
* | |
* this.ws.unsubscribe(this.heroes.subscriptionId); | |
* | |
* This causes several things to happen. First, the corresponding subscription | |
* observable is completed. Second, all references to the subscription are | |
* removed from the service's internal data structure. Third, an unsubscribe | |
* action is sent to the server so the server no longer tracks the subscription. | |
*/ | |
import { Injectable } from '@angular/core'; | |
import { Subject, ReplaySubject, Subscription, PartialObserver } from 'rxjs'; | |
import { filter, first } from 'rxjs/operators'; | |
import * as _ from 'lodash'; | |
export declare type Callback = (data: Response) => void; | |
export declare type Errback = (error: string) => void; | |
export declare type LifecycleEvent = 'open' | 'close'; | |
declare const BASE: string; | |
/** | |
* Observable returned by WebSocketService.subscribe() which always publishes | |
* the latest value returned by the server for this subscription. This also | |
* exposes a "subscriptionId" field which identifies this subscription. | |
* | |
* The main difference between this and a traditional observable is that errors | |
* do not terminate the observable. Instead, the expectation is that we might | |
* receive an interleaving series of data and error responses from the server. | |
*/ | |
export class WebSocketObservable<T> extends ReplaySubject<T> { | |
/** | |
* The id of the subscription; this should be used to send RPC requests to | |
* update this subscription. | |
*/ | |
subscriptionId: string; | |
/** | |
* This is where we store the error callbacks passed in calls to subscribe. | |
* Because errors do not terminate the stream, we store them here and then | |
* invoke them in our error() method. | |
*/ | |
errbacks: Array<Errback> = []; | |
constructor(subscriptionId: string) { | |
super(1); | |
this.subscriptionId = subscriptionId; | |
} | |
/** | |
* As mentioned above, this differs from the normal subscribe() method by | |
* storing error callbacks separately and invoking them every time we get an | |
* error back from the server rather than firing once and then terminating | |
* the stream. After setting aside the error callback we dispatch to the | |
* parent class' subscribe() method with a noop error callback in its place. | |
*/ | |
subscribe(observerOrNext?: PartialObserver<T> | ((value: T) => void), error?: (error: any) => void, complete?: () => void): Subscription { | |
if (error) { | |
this.errbacks.push(error); | |
} | |
let observer = super.subscribe(<any>observerOrNext, (err: any) => { }, complete); | |
let inheritor = { | |
unsubscribe: () => { | |
observer.unsubscribe(); | |
let index = this.errbacks.indexOf(error); | |
if (index !== -1) { | |
this.errbacks.splice(index, 1); | |
} | |
} | |
}; | |
Object.setPrototypeOf(inheritor, observer); | |
return <Subscription>inheritor; | |
} | |
/** | |
* Unlike a normal Observable's error method, this fires the error callbacks | |
* without terminating the stream. | |
*/ | |
error(message: string): void { | |
this.errbacks.forEach(errback => { | |
try { | |
errback(message); | |
} catch (ex) { | |
console.error('Exception in error callback for WebSocket subscription', this.subscriptionId, ex); | |
} | |
}); | |
} | |
} | |
/** | |
* Format of Sideboard's WebSocket RPC requests. Instances of this class are | |
* sent to the server as serialized JSON objects. | |
*/ | |
export class Request { | |
readonly action: string; | |
readonly method: string; | |
readonly params: any; | |
readonly client: string; | |
readonly callback: string; | |
readonly subscription?: string; | |
constructor(c: NormalizedConfig) { | |
[this.action, this.method, this.params, this.callback, this.client, this.subscription] = [c.action, c.method, c.params, c.callbackId, c.subscriptionId, c.subscriptionId]; | |
} | |
} | |
/** | |
* Format of Sideboard's WebSocket RPC responses. This is parsed from the JSON | |
* objects we receive from the server. | |
*/ | |
export class Response { | |
readonly data?: any; | |
readonly error?: string; | |
readonly client?: string; | |
readonly callback?: string; | |
readonly subscription?: string; | |
// don't error if Sideboard adds other properties to the response | |
[propName: string]: any; | |
} | |
/** | |
* WebSocketService.call() takes an instance of this class, e.g. | |
* this.ws.call({method: 'foo.bar', params: [5, 6]}) | |
*/ | |
export class CallConfig { | |
readonly method: string; | |
readonly params?: any = undefined; | |
readonly timeout?: number = undefined; | |
} | |
/** | |
* WebSocketService.subscribe() takes an instance of this class, e.g. | |
* this.ws.subscribe({method: 'foo.bar', 'params': [5, 6]}) | |
*/ | |
export class SubscribeConfig { | |
readonly method: string; | |
readonly params?: any = undefined; | |
readonly callbackId?: string = undefined; | |
readonly subscriptionId?: string = undefined; | |
} | |
/** | |
* WebSocketService keeps track of all calls waiting for responses and all | |
* active subscriptions. When a consumer invokes WebSocketService.subscribe or | |
* WebSocketService.call, the configuration gets normalized into an instance of | |
* this class and then stored in the WebSocketService.requests map. | |
* | |
* Anytime we receive a message from the server, we look up the NormalizedConfig | |
* instance that message corresponds to by the message's subscription or | |
* callback id, then invoke the callback or errback function. | |
*/ | |
export class NormalizedConfig { | |
method: string; | |
params: string; | |
single: boolean = false; | |
action?: string = undefined; | |
callbackId?: string = undefined; | |
subscriptionId?: string = undefined; | |
callback: Callback; | |
errback: Errback; | |
} | |
/** | |
* In addition to the three methods documented above, this class has several | |
* public fields, which are documented below. The reason why we make constants | |
* such as WS_URL public is that it allows applications to change these values. | |
* | |
* Instantiating this class doesn't automatically connect to the server; instead | |
* a connection is initiated when the first call or subscribe is made. | |
*/ | |
@Injectable() | |
export class WebSocketService { | |
/** | |
* Boolean variable that enables/disables console.log calls in the service | |
*/ | |
showDebugging: boolean = false; | |
/** | |
* URL we use to connect to the server; by default this is the server which | |
* served the current page with a /ws path. | |
*/ | |
WS_URL = (window.location.protocol === 'https:' ? 'wss' : 'ws') + '://' + window.location.host + '/' + BASE + '/ws'; | |
/** | |
* Sometimes a WebSocket connection goes dead without the socket actually | |
* being closed. In such cases we sometimes are able to send messages with | |
* no apparent error, but nothing is ever actually sent or received. To | |
* detect and recover from such cases, we periodically call "sideboard.poll" | |
* and if we don't get back a response then we close the connection and | |
* open a new one. | |
*/ | |
POLL_INTERVAL = 30000; | |
/** | |
* When we fail to connect to the server, we automatically retry using an | |
* exponential backoff. We start with this many milliseconds and reset to | |
* it after we successfully connect. | |
*/ | |
MIN_RECONNECT_WAIT = 1000; | |
/** | |
* The maximum amount of time to wait between reconnection attempts. | |
*/ | |
MAX_RECONNECT_WAIT = 30000; | |
/** | |
* When making an RPC call, this is the maximum amount of time to wait | |
* before we assume that no response will arrive and reject the promise. | |
*/ | |
DEFAULT_CALL_TIMEOUT = 10000; | |
/** | |
* WebSocket objects have a close method which we expect to call with a | |
* reason, which is logged to the Javascript console. This is the default | |
* code we use if the caller doesn't specify one. | |
*/ | |
DEFAULT_CLOSE_CODE = 1000; | |
/** | |
* WebSocket objects take a string when they're closed which they log; this | |
* is the default message if the caller doesn't specify one. | |
*/ | |
DEFAULT_CLOSE_REASON = 'websocket closed via an explicit call to WebSocketService.close()'; | |
/** | |
* WebSocket instance we use to communicate with the server. Consumers | |
* shouldn't generally call this directly, but we expose it as a public | |
* field in case there's some reason why an application would need to bypass | |
* this class to use it directly. | |
*/ | |
ws: WebSocket = null; | |
/** | |
* Used to publish "open" and "close" events when we connect and disconnect | |
* from the server. Consumers wishing to take some action when this happens | |
* should subscribe to this Observable; the values published will be either | |
* the string "open" or the string "closed". | |
*/ | |
readonly lifecycleEvents = new Subject<LifecycleEvent>(); | |
/** | |
* Sideboard's WebSocket RPC protocol has subscription and callback ids | |
* which are used to identify which request the responses correspond to. | |
* Sideboard doesn't care what these ids look like, so we use the format | |
* "subscription-X" and "callback-X". This counter fill in X with the | |
* next unused integer. | |
*/ | |
private counter = 1; | |
/** | |
* This tracks the current number of milliseconds to wait between attempts | |
* to reconnect to the server when the connection goes down. | |
*/ | |
private currReconnectWait = 1000; | |
/** | |
* This contains the config for all outstanding calls and subscriptions, | |
* indexed by the callback id (for calls) or subscription id. | |
*/ | |
private requests = new Map<string, NormalizedConfig>(); | |
/** | |
* This contains the WebSocketObservable objects for all outstanding | |
* subscriptions, indexed by their subscription ids. | |
*/ | |
private subscriptions = new Map<string, WebSocketObservable<any>>(); | |
/** | |
* This is where we store the ids returned by window.setTimeout for the | |
* "background task" which checks our WebSocket to see if it's still alive. | |
*/ | |
private poller: number = null; | |
/** | |
* Returns the status code of our WebSocket instance, or WebSocket.CLOSED if | |
* we don't currently have an instantiated WebSocket connection. | |
*/ | |
private getStatus = (): number => { | |
if (this.ws) { | |
return this.ws.readyState; | |
} else { | |
return WebSocket.CLOSED; | |
} | |
}; | |
/** | |
* Returns a string version of WebSocketService.getStatus() suitable for | |
* including in a log message. | |
*/ | |
private getStatusString = (): string => { | |
return this.isConnecting() ? 'CONNECTING' : | |
this.isOpen() ? 'OPEN' : | |
this.isClosing() ? 'CLOSING' : 'CLOSED'; | |
}; | |
/** | |
* Returns a boolean indicating whether there's currently a WebSocket | |
* instance in the open state, i.e. able to send and receive messages. | |
*/ | |
private isOpen = (): boolean => { | |
return this.getStatus() === WebSocket.OPEN; | |
}; | |
/** | |
* Returns a boolean indicating whether there's currently a WebSocket | |
* instance in the connecting state, i.e. not yet ready for messages. | |
*/ | |
private isConnecting = (): boolean => { | |
return this.getStatus() === WebSocket.CONNECTING; | |
}; | |
/** | |
* Returns a boolean indicating whether there's currently a WebSocket | |
* instance in the closing state, i.e. about to close and requiring that a | |
* new connection will soon need to be created. | |
*/ | |
private isClosing = (): boolean => { | |
return this.getStatus() === WebSocket.CLOSING; | |
}; | |
/** | |
* Returns a boolean indicating whether there's no current active WebSocket | |
* instance, indicating that we must create one before messages can be sent. | |
*/ | |
private isClosed = (): boolean => { | |
return this.getStatus() === WebSocket.CLOSED; | |
}; | |
/** | |
* Given a CallConfig object and pair of callback/errback functions, return | |
* NormalizedConfig object for this call. | |
*/ | |
private normalizeCall = (conf: CallConfig, callback: Callback, errback: Errback): NormalizedConfig => { | |
return { | |
single: true, | |
method: conf.method, | |
params: conf.params, | |
callback: callback, | |
errback: errback, | |
callbackId: 'callback-' + this.counter++ | |
}; | |
}; | |
/** | |
* Given a SubscribeConfig object, a subscription id, and pair of callback | |
* /errback functions, return a NormalizedConfig object for this subscription. | |
*/ | |
private normalizeSubscribe = (conf: SubscribeConfig, subscriptionId: string, callback: Callback, errback: Errback): NormalizedConfig => { | |
return { | |
single: false, | |
method: conf.method, | |
params: conf.params, | |
subscriptionId: subscriptionId, | |
callbackId: conf.callbackId, | |
callback: callback, | |
errback: errback | |
}; | |
}; | |
/** | |
* Callback we set as our WebSocket.onopen handler to kick off the periodic | |
* polling, refire any active subscriptions, and fire our open event. | |
*/ | |
private onOpen = () => { | |
this.currReconnectWait = this.MIN_RECONNECT_WAIT; | |
this.schedulePoll(); | |
this.refireSubscriptions(); | |
this.lifecycleEvents.next('open'); | |
}; | |
/** | |
* Callback we set as our WebSocket.onclose handler to stop the periodic | |
* poll, fire our close event, and schedule an automatic reconnect attempt. | |
*/ | |
private onClose = () => { | |
this.stopPolling(); | |
window.setTimeout(this.connect, this.currReconnectWait); | |
this.currReconnectWait = Math.min(this.MAX_RECONNECT_WAIT, 2 * this.currReconnectWait); | |
this.lifecycleEvents.next('close'); | |
}; | |
/** | |
* Callback we set as our WebSocket.onerror handler to close the connection. | |
*/ | |
private onError = (error) => { | |
console.error('WebSocket.onerror:', error); | |
this.close(this.DEFAULT_CLOSE_CODE, error); | |
}; | |
/** | |
* Callback we set as our WebSocket.onmessage handler. The only messages we | |
* expect to receive from Sideboard are responses to our RPC requests, so | |
* this method attempts to parse the message as a JSON object and pass it to | |
* WebSocketService.handleMessage, logging to the console on failure. | |
*/ | |
private onMessage = (event: any) => { | |
let json; | |
try { | |
json = JSON.parse(event.data || 'null'); | |
} catch (ex) { | |
console.error('websocket message parse error', event, ex); | |
return; | |
} | |
if (!json || typeof json !== 'object') { | |
console.error('websocket message parsed to a non-object', json); | |
} else { | |
this.handleMessage(json as Response); | |
} | |
}; | |
/** | |
* Given a Response from the server, we use the callback and subscription | |
* ids to identify which request the message corresponds to and then invoke | |
* the appropriate callback or errback function. If the Response is for a | |
* call rather than a subscription, we stop tracking the original request. | |
*/ | |
private handleMessage = (response: Response) => { | |
let conf = this.requests.get(response.subscription || response.client || response.callback); | |
if (conf) { | |
if (this.showDebugging) { | |
console.log('received WebSocket RPC response:', response); | |
} | |
let funcAttr = response.error ? 'errback' : 'callback', | |
dataAttr = response.error ? 'error' : 'data'; | |
try { | |
conf[funcAttr](response[dataAttr]); | |
} catch (ex) { | |
console.error('Error executing websocket', funcAttr, 'callback:', ex); | |
} | |
if (conf.single) { | |
this.requests.delete(conf.callbackId); | |
} | |
} else { | |
console.error('unknown subscription and/or callback id', response); | |
} | |
}; | |
/** | |
* When a consumer makes a call to WebSocketService.subscribe() and then our | |
* connection goes dead and reconnects, we want to re-send all of the RPC | |
* subscriptions which the consumer made after the connection is reopened. | |
*/ | |
private refireSubscriptions = () => { | |
this.requests.forEach((conf) => { | |
if (!conf.single) { | |
this.send(conf); | |
} | |
}); | |
}; | |
/** | |
* This method directly instantiates the WebSocket class and sets this.ws. | |
* It exists as its own method for easy mocking in our unit tests. | |
*/ | |
private _connect = () => { | |
this.ws = new WebSocket(this.WS_URL); | |
this.ws.onopen = this.onOpen; | |
this.ws.onclose = this.onClose; | |
this.ws.onerror = this.onError; | |
this.ws.onmessage = this.onMessage; | |
}; | |
/** | |
* Attempt to connect to the server if we aren't already connected. Either | |
* way, once we are connected, invoke the given callback. | |
*/ | |
private connect = (callback: () => void = null) => { | |
callback = callback || (() => { }); // default to a noop | |
if (this.isConnecting()) { | |
this.onNext('open', callback); | |
} else if (this.isClosing()) { | |
this.onNext('close', () => { | |
this.connect(callback); | |
}); | |
} else if (this.isClosed()) { | |
this._connect(); | |
this.onNext('open', callback); | |
} else if (this.isOpen()) { | |
callback(); | |
} | |
}; | |
/** | |
* Close our WebSocket connection and then dereference it. This method | |
* swallows all exceptions and is always safe to call even if we don't have | |
* an open connection when it's invoked. | |
* | |
* This method takes a code and reason parameter, which are passed directly | |
* to WebSocket.close(). If omitted, we fill them in with the defaults | |
* defined in this class. | |
*/ | |
private close = (code: number = null, reason: string = null) => { | |
if (this.ws) { | |
try { | |
if (!this.isClosed()) { | |
this.ws.onopen = this.ws.onclose = this.ws.onerror = this.ws.onmessage = null; | |
this.ws.close(code || this.DEFAULT_CLOSE_CODE, reason || this.DEFAULT_CLOSE_REASON); | |
} | |
this.onClose(); | |
} catch (ex) { | |
console.error('error calling close on', this.getStatusString(), 'websocket', ex); | |
} | |
this.ws = null; | |
} | |
}; | |
/** | |
* Given a request configuration, instantiate a Request, serialize it to | |
* JSON, and send it on our open WebSocket connection. This method should | |
* only be called if our connection is currently active by being invoked | |
* in the callback passed to WebSocketService.connect() e.g. | |
* this.connect(() => { | |
* this.send(...); // we are now guaranteed to be connected | |
* }); | |
*/ | |
private send = (conf: NormalizedConfig) => { | |
if (conf.method) { | |
this.requests.set(conf.subscriptionId || conf.callbackId, conf); | |
} | |
let request = new Request(conf); | |
if (this.showDebugging) { | |
console.log('sending WebSocket RPC message', request); | |
} | |
this.ws.send(JSON.stringify(request)); | |
}; | |
/** | |
* WebSocketService doesn't attempt to connect a WebSocket to the server | |
* until the first time a consumer makes a call or subscription. This | |
* utility method takes a config and then attempts to connect if we're not | |
* already connected and then sends the message once connected. | |
*/ | |
private connectAndSend = (conf: NormalizedConfig) => { | |
this.connect(() => { | |
this.send(conf); | |
}); | |
}; | |
/** | |
* Send a message to the server and ensure that we get back a response. If | |
* we do, then schedule the next poll, and if we don't then close the | |
* WebSocket and immediately attempt to reconnect. | |
*/ | |
private poll = () => { | |
this.call('sideboard.poll').then(this.schedulePoll).catch(() => { | |
console.error('closing websocket due to poll failure; will attempt to reconnect'); | |
this.close(this.DEFAULT_CLOSE_CODE, 'poll failed'); | |
this.connect(); | |
}); | |
}; | |
/** | |
* Cancel the next scheduled poll if it exists and schedule a new one. | |
*/ | |
private schedulePoll = () => { | |
this.stopPolling(); | |
this.poller = window.setTimeout(this.poll, this.POLL_INTERVAL); | |
}; | |
/** | |
* Called when our connection closes to stop the next scheduled poll if one | |
* exists. If one does not exist, this method is effectively a noop. | |
*/ | |
private stopPolling = () => { | |
window.clearTimeout(this.poller); | |
}; | |
/** | |
* Utility function for invoking a callback only once the next time we | |
* fire the given lifecycle event (i.e. "open" or "close"). This is exposed | |
* for consumers who want this kind of one-off notification. | |
*/ | |
onNext = (event: string, callback: () => void) => { | |
this.lifecycleEvents.pipe( | |
filter((e, i) => e === event), | |
first(), | |
) | |
.subscribe(callback); | |
}; | |
/** | |
* Make a one-off RPC request to the server, returning a promise which is | |
* fulfilled with the return value we get from the server and rejected if we | |
* get back an error or don't get back a response at all. | |
*/ | |
call = (req: string | CallConfig, ...params): Promise<Response> => { | |
let callConf: CallConfig = typeof req === 'string' ? { | |
method: req, | |
params: params | |
} : req; | |
return new Promise<Response>((resolve, reject) => { | |
let normalizedConf: NormalizedConfig, timeoutId: number; | |
let completed = false; | |
let complete = () => { | |
completed = true; | |
this.requests.delete(normalizedConf.callbackId); | |
window.clearTimeout(timeoutId); | |
}; | |
let timeoutMs = callConf.timeout || this.DEFAULT_CALL_TIMEOUT; | |
timeoutId = window.setTimeout(() => { | |
if (!completed) { | |
console.error('no response received for', timeoutMs, 'milliseconds', callConf); | |
complete(); | |
reject('websocket call timed out'); | |
} | |
}, timeoutMs); | |
normalizedConf = this.normalizeCall(callConf, (data) => { | |
complete(); | |
if (Object.keys(data || {}).indexOf('error') !== -1) { | |
reject(data.error); | |
} else { | |
resolve(data); | |
} | |
}, (error) => { | |
complete(); | |
reject(error); | |
}); | |
this.connectAndSend(normalizedConf); | |
}); | |
}; | |
/** | |
* Creates or updates a subscription, returning a WebSocketObservable with | |
* a subscription id and which always has the latest returned value. | |
*/ | |
subscribe = (req: SubscribeConfig): WebSocketObservable<any> => { | |
let subscriptionId = req.subscriptionId || ('subscription-' + this.counter++); | |
let results = this.subscriptions.get(subscriptionId) || new WebSocketObservable<any>(subscriptionId); | |
let handleError = (error: string) => { | |
results.error(error); | |
}; | |
let processData = (data: Response) => { | |
if (Object.keys(data || {}).indexOf('error') !== -1) { | |
handleError(data.error); | |
} else { | |
results.next(data); | |
} | |
}; | |
let conf = this.normalizeSubscribe(req, subscriptionId, processData, handleError); | |
this.subscriptions.set(subscriptionId, results); | |
// When .subscribe() is called with the same subscription id and a | |
// different method or parameters, we update the subscription and send | |
// the new request to the server. However, if the method and parameters | |
// are exactly the same as the last call, we simply return the cached | |
// subscription without sending anything to the server. | |
const request = this.requests.get(conf.subscriptionId); | |
if (!this.isOpen() || !request || request.method !== conf.method || !_.isEqual(request.params, conf.params)) { | |
this.connectAndSend(conf); | |
} | |
return results; | |
}; | |
/** | |
* This method takes a number of subscription ids and cancels them on the | |
* server and discards them from our internal data structures. | |
*/ | |
unsubscribe = (...params) => { | |
for (let subscriptionId of params) { | |
let subscription = this.subscriptions.get(subscriptionId); | |
if (subscription) { | |
subscription.complete(); | |
} | |
this.requests.delete(subscriptionId); | |
this.subscriptions.delete(subscriptionId); | |
} | |
if (this.isOpen() && params.length) { | |
let message = { action: 'unsubscribe', client: params }; | |
if (this.showDebugging) { | |
console.log('sending WebSocket RPC unsubscribe action', message); | |
} | |
this.ws.send(JSON.stringify(message)); | |
} | |
}; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment