Skip to content

Instantly share code, notes, and snippets.

@EliAndrewC
Last active October 30, 2019 17:25
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save EliAndrewC/05ebf79f520a1ab98c621c8c278ad30f to your computer and use it in GitHub Desktop.
Save EliAndrewC/05ebf79f520a1ab98c621c8c278ad30f to your computer and use it in GitHub Desktop.
/**
* This modules exports an Angular 2 WebSocketService implementing Sideboard's
* WebSocket RPC protocol. There are three main actions exposed to consumers:
* calling RPC methods, making subscriptions, and canceling subscriptions.
*
*
* Calling RPC Methods
* -------------------
* Consumers can make one-off RPC calls which return promises:
*
* this.ws.call('foo.bar', 'baz', 'baf').then(onSuccess).catch(onError);
*
* This translates into an RPC request like this:
*
* {"method": "foo.bar", "params": ["baz", "baf"], "callback": "callback-42"}
*
* The promise is resolved when we get a non-error response back from the server
* and rejected when we get an error response or time out. Any response object
* with an "error" field is treated as an error.
*
* Consumers can also use the long-form approach to making calls by passing in
* a CallConfig object instead of a method name and list of parameters:
*
* this.ws.call({
* method: "foo.bar",
* params: ["baz", "baf"]
* }).then(onSuccess).catch(onError);
*
* This would generate an identical RPC request to the previous example, but is
* more flexible in that it allows you to use keyword arguments instead of
* positional arguments, as well as specify your own custom timeout value.
*
*
* Making Subscriptions
* --------------------
* Consumers can make "subscription" requests to the server, which returns a
* value immediately and then also returns an updated value every time it
* changes. These subscription calls return a WebSocketObservable object, which
* is an Observable with an additional subscriptionId field which identifies it.
*
* For example, if we were implementing the Angular 2 Tour of Heroes tutorial
* using WebSocketService, we might do something like this:
*
* @Injectable()
* export class HeroService {
* heroes: WebSocketObservable<any>;
* constructor (private ws: WebSocketService) {
* this.heroes = this.ws.subscribe({method: 'heroes.get_all_heroes'});
* }
* }
*
* Then other classes could use the standard Observable.subscribe() method to
* get the latest data which was returned from the server, e.g.
*
* this.heroService.heroes.subscribe(processDataCallback, handleErrorCallback);
*
* WebSocketObservables can also be updated. For example, if our heroes service
* had paging, instead of canceling our subscription and creating a new one, the
* HeroService could update its subscription like this:
*
* this.ws.subscribe({
* subscriptionId: this.heroes.subscriptionId,
* method: 'heroes.get_all_heroes',
* params: {page: 2}
* });
*
* All of the observers subscribed to the existing WebSocketObservable instance
* would have the updated value published to them without needing to
* re-subscribe. In addition, making multiple calls to
* WebSocketService.subscribe() with the same subscriptionId always returns the
* same WebSocketObservable instance.
*
* An error return value from the server will emit an error to the observable,
* which will stop future values from being published. If you want to resume,
* you'll need unsubscribe (see below) and re-subscribe with valid parameters.
* As with calls, a return value object with an "error" field is treated as an
* error response.
*
*
* Canceling Subscriptions
* -----------------------
* If you've made a subscription and no longer want to receive updates, you can
* cancel it by passing the subscriptionId to the unsubscribe method, e.g.
* HeroService could do this:
*
* this.ws.unsubscribe(this.heroes.subscriptionId);
*
* This causes several things to happen. First, the corresponding subscription
* observable is completed. Second, all references to the subscription are
* removed from the service's internal data structure. Third, an unsubscribe
* action is sent to the server so the server no longer tracks the subscription.
*/
import { Injectable } from '@angular/core';
import { Subject, ReplaySubject, Subscription, PartialObserver } from 'rxjs';
import { filter, first } from 'rxjs/operators';
import * as _ from 'lodash';
export declare type Callback = (data: Response) => void;
export declare type Errback = (error: string) => void;
export declare type LifecycleEvent = 'open' | 'close';
declare const BASE: string;
/**
* Observable returned by WebSocketService.subscribe() which always publishes
* the latest value returned by the server for this subscription. This also
* exposes a "subscriptionId" field which identifies this subscription.
*
* The main difference between this and a traditional observable is that errors
* do not terminate the observable. Instead, the expectation is that we might
* receive an interleaving series of data and error responses from the server.
*/
export class WebSocketObservable<T> extends ReplaySubject<T> {
/**
* The id of the subscription; this should be used to send RPC requests to
* update this subscription.
*/
subscriptionId: string;
/**
* This is where we store the error callbacks passed in calls to subscribe.
* Because errors do not terminate the stream, we store them here and then
* invoke them in our error() method.
*/
errbacks: Array<Errback> = [];
constructor(subscriptionId: string) {
super(1);
this.subscriptionId = subscriptionId;
}
/**
* As mentioned above, this differs from the normal subscribe() method by
* storing error callbacks separately and invoking them every time we get an
* error back from the server rather than firing once and then terminating
* the stream. After setting aside the error callback we dispatch to the
* parent class' subscribe() method with a noop error callback in its place.
*/
subscribe(observerOrNext?: PartialObserver<T> | ((value: T) => void), error?: (error: any) => void, complete?: () => void): Subscription {
if (error) {
this.errbacks.push(error);
}
let observer = super.subscribe(<any>observerOrNext, (err: any) => { }, complete);
let inheritor = {
unsubscribe: () => {
observer.unsubscribe();
let index = this.errbacks.indexOf(error);
if (index !== -1) {
this.errbacks.splice(index, 1);
}
}
};
Object.setPrototypeOf(inheritor, observer);
return <Subscription>inheritor;
}
/**
* Unlike a normal Observable's error method, this fires the error callbacks
* without terminating the stream.
*/
error(message: string): void {
this.errbacks.forEach(errback => {
try {
errback(message);
} catch (ex) {
console.error('Exception in error callback for WebSocket subscription', this.subscriptionId, ex);
}
});
}
}
/**
* Format of Sideboard's WebSocket RPC requests. Instances of this class are
* sent to the server as serialized JSON objects.
*/
export class Request {
readonly action: string;
readonly method: string;
readonly params: any;
readonly client: string;
readonly callback: string;
readonly subscription?: string;
constructor(c: NormalizedConfig) {
[this.action, this.method, this.params, this.callback, this.client, this.subscription] = [c.action, c.method, c.params, c.callbackId, c.subscriptionId, c.subscriptionId];
}
}
/**
* Format of Sideboard's WebSocket RPC responses. This is parsed from the JSON
* objects we receive from the server.
*/
export class Response {
readonly data?: any;
readonly error?: string;
readonly client?: string;
readonly callback?: string;
readonly subscription?: string;
// don't error if Sideboard adds other properties to the response
[propName: string]: any;
}
/**
* WebSocketService.call() takes an instance of this class, e.g.
* this.ws.call({method: 'foo.bar', params: [5, 6]})
*/
export class CallConfig {
readonly method: string;
readonly params?: any = undefined;
readonly timeout?: number = undefined;
}
/**
* WebSocketService.subscribe() takes an instance of this class, e.g.
* this.ws.subscribe({method: 'foo.bar', 'params': [5, 6]})
*/
export class SubscribeConfig {
readonly method: string;
readonly params?: any = undefined;
readonly callbackId?: string = undefined;
readonly subscriptionId?: string = undefined;
}
/**
* WebSocketService keeps track of all calls waiting for responses and all
* active subscriptions. When a consumer invokes WebSocketService.subscribe or
* WebSocketService.call, the configuration gets normalized into an instance of
* this class and then stored in the WebSocketService.requests map.
*
* Anytime we receive a message from the server, we look up the NormalizedConfig
* instance that message corresponds to by the message's subscription or
* callback id, then invoke the callback or errback function.
*/
export class NormalizedConfig {
method: string;
params: string;
single: boolean = false;
action?: string = undefined;
callbackId?: string = undefined;
subscriptionId?: string = undefined;
callback: Callback;
errback: Errback;
}
/**
* In addition to the three methods documented above, this class has several
* public fields, which are documented below. The reason why we make constants
* such as WS_URL public is that it allows applications to change these values.
*
* Instantiating this class doesn't automatically connect to the server; instead
* a connection is initiated when the first call or subscribe is made.
*/
@Injectable()
export class WebSocketService {
/**
* Boolean variable that enables/disables console.log calls in the service
*/
showDebugging: boolean = false;
/**
* URL we use to connect to the server; by default this is the server which
* served the current page with a /ws path.
*/
WS_URL = (window.location.protocol === 'https:' ? 'wss' : 'ws') + '://' + window.location.host + '/' + BASE + '/ws';
/**
* Sometimes a WebSocket connection goes dead without the socket actually
* being closed. In such cases we sometimes are able to send messages with
* no apparent error, but nothing is ever actually sent or received. To
* detect and recover from such cases, we periodically call "sideboard.poll"
* and if we don't get back a response then we close the connection and
* open a new one.
*/
POLL_INTERVAL = 30000;
/**
* When we fail to connect to the server, we automatically retry using an
* exponential backoff. We start with this many milliseconds and reset to
* it after we successfully connect.
*/
MIN_RECONNECT_WAIT = 1000;
/**
* The maximum amount of time to wait between reconnection attempts.
*/
MAX_RECONNECT_WAIT = 30000;
/**
* When making an RPC call, this is the maximum amount of time to wait
* before we assume that no response will arrive and reject the promise.
*/
DEFAULT_CALL_TIMEOUT = 10000;
/**
* WebSocket objects have a close method which we expect to call with a
* reason, which is logged to the Javascript console. This is the default
* code we use if the caller doesn't specify one.
*/
DEFAULT_CLOSE_CODE = 1000;
/**
* WebSocket objects take a string when they're closed which they log; this
* is the default message if the caller doesn't specify one.
*/
DEFAULT_CLOSE_REASON = 'websocket closed via an explicit call to WebSocketService.close()';
/**
* WebSocket instance we use to communicate with the server. Consumers
* shouldn't generally call this directly, but we expose it as a public
* field in case there's some reason why an application would need to bypass
* this class to use it directly.
*/
ws: WebSocket = null;
/**
* Used to publish "open" and "close" events when we connect and disconnect
* from the server. Consumers wishing to take some action when this happens
* should subscribe to this Observable; the values published will be either
* the string "open" or the string "closed".
*/
readonly lifecycleEvents = new Subject<LifecycleEvent>();
/**
* Sideboard's WebSocket RPC protocol has subscription and callback ids
* which are used to identify which request the responses correspond to.
* Sideboard doesn't care what these ids look like, so we use the format
* "subscription-X" and "callback-X". This counter fill in X with the
* next unused integer.
*/
private counter = 1;
/**
* This tracks the current number of milliseconds to wait between attempts
* to reconnect to the server when the connection goes down.
*/
private currReconnectWait = 1000;
/**
* This contains the config for all outstanding calls and subscriptions,
* indexed by the callback id (for calls) or subscription id.
*/
private requests = new Map<string, NormalizedConfig>();
/**
* This contains the WebSocketObservable objects for all outstanding
* subscriptions, indexed by their subscription ids.
*/
private subscriptions = new Map<string, WebSocketObservable<any>>();
/**
* This is where we store the ids returned by window.setTimeout for the
* "background task" which checks our WebSocket to see if it's still alive.
*/
private poller: number = null;
/**
* Returns the status code of our WebSocket instance, or WebSocket.CLOSED if
* we don't currently have an instantiated WebSocket connection.
*/
private getStatus = (): number => {
if (this.ws) {
return this.ws.readyState;
} else {
return WebSocket.CLOSED;
}
};
/**
* Returns a string version of WebSocketService.getStatus() suitable for
* including in a log message.
*/
private getStatusString = (): string => {
return this.isConnecting() ? 'CONNECTING' :
this.isOpen() ? 'OPEN' :
this.isClosing() ? 'CLOSING' : 'CLOSED';
};
/**
* Returns a boolean indicating whether there's currently a WebSocket
* instance in the open state, i.e. able to send and receive messages.
*/
private isOpen = (): boolean => {
return this.getStatus() === WebSocket.OPEN;
};
/**
* Returns a boolean indicating whether there's currently a WebSocket
* instance in the connecting state, i.e. not yet ready for messages.
*/
private isConnecting = (): boolean => {
return this.getStatus() === WebSocket.CONNECTING;
};
/**
* Returns a boolean indicating whether there's currently a WebSocket
* instance in the closing state, i.e. about to close and requiring that a
* new connection will soon need to be created.
*/
private isClosing = (): boolean => {
return this.getStatus() === WebSocket.CLOSING;
};
/**
* Returns a boolean indicating whether there's no current active WebSocket
* instance, indicating that we must create one before messages can be sent.
*/
private isClosed = (): boolean => {
return this.getStatus() === WebSocket.CLOSED;
};
/**
* Given a CallConfig object and pair of callback/errback functions, return
* NormalizedConfig object for this call.
*/
private normalizeCall = (conf: CallConfig, callback: Callback, errback: Errback): NormalizedConfig => {
return {
single: true,
method: conf.method,
params: conf.params,
callback: callback,
errback: errback,
callbackId: 'callback-' + this.counter++
};
};
/**
* Given a SubscribeConfig object, a subscription id, and pair of callback
* /errback functions, return a NormalizedConfig object for this subscription.
*/
private normalizeSubscribe = (conf: SubscribeConfig, subscriptionId: string, callback: Callback, errback: Errback): NormalizedConfig => {
return {
single: false,
method: conf.method,
params: conf.params,
subscriptionId: subscriptionId,
callbackId: conf.callbackId,
callback: callback,
errback: errback
};
};
/**
* Callback we set as our WebSocket.onopen handler to kick off the periodic
* polling, refire any active subscriptions, and fire our open event.
*/
private onOpen = () => {
this.currReconnectWait = this.MIN_RECONNECT_WAIT;
this.schedulePoll();
this.refireSubscriptions();
this.lifecycleEvents.next('open');
};
/**
* Callback we set as our WebSocket.onclose handler to stop the periodic
* poll, fire our close event, and schedule an automatic reconnect attempt.
*/
private onClose = () => {
this.stopPolling();
window.setTimeout(this.connect, this.currReconnectWait);
this.currReconnectWait = Math.min(this.MAX_RECONNECT_WAIT, 2 * this.currReconnectWait);
this.lifecycleEvents.next('close');
};
/**
* Callback we set as our WebSocket.onerror handler to close the connection.
*/
private onError = (error) => {
console.error('WebSocket.onerror:', error);
this.close(this.DEFAULT_CLOSE_CODE, error);
};
/**
* Callback we set as our WebSocket.onmessage handler. The only messages we
* expect to receive from Sideboard are responses to our RPC requests, so
* this method attempts to parse the message as a JSON object and pass it to
* WebSocketService.handleMessage, logging to the console on failure.
*/
private onMessage = (event: any) => {
let json;
try {
json = JSON.parse(event.data || 'null');
} catch (ex) {
console.error('websocket message parse error', event, ex);
return;
}
if (!json || typeof json !== 'object') {
console.error('websocket message parsed to a non-object', json);
} else {
this.handleMessage(json as Response);
}
};
/**
* Given a Response from the server, we use the callback and subscription
* ids to identify which request the message corresponds to and then invoke
* the appropriate callback or errback function. If the Response is for a
* call rather than a subscription, we stop tracking the original request.
*/
private handleMessage = (response: Response) => {
let conf = this.requests.get(response.subscription || response.client || response.callback);
if (conf) {
if (this.showDebugging) {
console.log('received WebSocket RPC response:', response);
}
let funcAttr = response.error ? 'errback' : 'callback',
dataAttr = response.error ? 'error' : 'data';
try {
conf[funcAttr](response[dataAttr]);
} catch (ex) {
console.error('Error executing websocket', funcAttr, 'callback:', ex);
}
if (conf.single) {
this.requests.delete(conf.callbackId);
}
} else {
console.error('unknown subscription and/or callback id', response);
}
};
/**
* When a consumer makes a call to WebSocketService.subscribe() and then our
* connection goes dead and reconnects, we want to re-send all of the RPC
* subscriptions which the consumer made after the connection is reopened.
*/
private refireSubscriptions = () => {
this.requests.forEach((conf) => {
if (!conf.single) {
this.send(conf);
}
});
};
/**
* This method directly instantiates the WebSocket class and sets this.ws.
* It exists as its own method for easy mocking in our unit tests.
*/
private _connect = () => {
this.ws = new WebSocket(this.WS_URL);
this.ws.onopen = this.onOpen;
this.ws.onclose = this.onClose;
this.ws.onerror = this.onError;
this.ws.onmessage = this.onMessage;
};
/**
* Attempt to connect to the server if we aren't already connected. Either
* way, once we are connected, invoke the given callback.
*/
private connect = (callback: () => void = null) => {
callback = callback || (() => { }); // default to a noop
if (this.isConnecting()) {
this.onNext('open', callback);
} else if (this.isClosing()) {
this.onNext('close', () => {
this.connect(callback);
});
} else if (this.isClosed()) {
this._connect();
this.onNext('open', callback);
} else if (this.isOpen()) {
callback();
}
};
/**
* Close our WebSocket connection and then dereference it. This method
* swallows all exceptions and is always safe to call even if we don't have
* an open connection when it's invoked.
*
* This method takes a code and reason parameter, which are passed directly
* to WebSocket.close(). If omitted, we fill them in with the defaults
* defined in this class.
*/
private close = (code: number = null, reason: string = null) => {
if (this.ws) {
try {
if (!this.isClosed()) {
this.ws.onopen = this.ws.onclose = this.ws.onerror = this.ws.onmessage = null;
this.ws.close(code || this.DEFAULT_CLOSE_CODE, reason || this.DEFAULT_CLOSE_REASON);
}
this.onClose();
} catch (ex) {
console.error('error calling close on', this.getStatusString(), 'websocket', ex);
}
this.ws = null;
}
};
/**
* Given a request configuration, instantiate a Request, serialize it to
* JSON, and send it on our open WebSocket connection. This method should
* only be called if our connection is currently active by being invoked
* in the callback passed to WebSocketService.connect() e.g.
* this.connect(() => {
* this.send(...); // we are now guaranteed to be connected
* });
*/
private send = (conf: NormalizedConfig) => {
if (conf.method) {
this.requests.set(conf.subscriptionId || conf.callbackId, conf);
}
let request = new Request(conf);
if (this.showDebugging) {
console.log('sending WebSocket RPC message', request);
}
this.ws.send(JSON.stringify(request));
};
/**
* WebSocketService doesn't attempt to connect a WebSocket to the server
* until the first time a consumer makes a call or subscription. This
* utility method takes a config and then attempts to connect if we're not
* already connected and then sends the message once connected.
*/
private connectAndSend = (conf: NormalizedConfig) => {
this.connect(() => {
this.send(conf);
});
};
/**
* Send a message to the server and ensure that we get back a response. If
* we do, then schedule the next poll, and if we don't then close the
* WebSocket and immediately attempt to reconnect.
*/
private poll = () => {
this.call('sideboard.poll').then(this.schedulePoll).catch(() => {
console.error('closing websocket due to poll failure; will attempt to reconnect');
this.close(this.DEFAULT_CLOSE_CODE, 'poll failed');
this.connect();
});
};
/**
* Cancel the next scheduled poll if it exists and schedule a new one.
*/
private schedulePoll = () => {
this.stopPolling();
this.poller = window.setTimeout(this.poll, this.POLL_INTERVAL);
};
/**
* Called when our connection closes to stop the next scheduled poll if one
* exists. If one does not exist, this method is effectively a noop.
*/
private stopPolling = () => {
window.clearTimeout(this.poller);
};
/**
* Utility function for invoking a callback only once the next time we
* fire the given lifecycle event (i.e. "open" or "close"). This is exposed
* for consumers who want this kind of one-off notification.
*/
onNext = (event: string, callback: () => void) => {
this.lifecycleEvents.pipe(
filter((e, i) => e === event),
first(),
)
.subscribe(callback);
};
/**
* Make a one-off RPC request to the server, returning a promise which is
* fulfilled with the return value we get from the server and rejected if we
* get back an error or don't get back a response at all.
*/
call = (req: string | CallConfig, ...params): Promise<Response> => {
let callConf: CallConfig = typeof req === 'string' ? {
method: req,
params: params
} : req;
return new Promise<Response>((resolve, reject) => {
let normalizedConf: NormalizedConfig, timeoutId: number;
let completed = false;
let complete = () => {
completed = true;
this.requests.delete(normalizedConf.callbackId);
window.clearTimeout(timeoutId);
};
let timeoutMs = callConf.timeout || this.DEFAULT_CALL_TIMEOUT;
timeoutId = window.setTimeout(() => {
if (!completed) {
console.error('no response received for', timeoutMs, 'milliseconds', callConf);
complete();
reject('websocket call timed out');
}
}, timeoutMs);
normalizedConf = this.normalizeCall(callConf, (data) => {
complete();
if (Object.keys(data || {}).indexOf('error') !== -1) {
reject(data.error);
} else {
resolve(data);
}
}, (error) => {
complete();
reject(error);
});
this.connectAndSend(normalizedConf);
});
};
/**
* Creates or updates a subscription, returning a WebSocketObservable with
* a subscription id and which always has the latest returned value.
*/
subscribe = (req: SubscribeConfig): WebSocketObservable<any> => {
let subscriptionId = req.subscriptionId || ('subscription-' + this.counter++);
let results = this.subscriptions.get(subscriptionId) || new WebSocketObservable<any>(subscriptionId);
let handleError = (error: string) => {
results.error(error);
};
let processData = (data: Response) => {
if (Object.keys(data || {}).indexOf('error') !== -1) {
handleError(data.error);
} else {
results.next(data);
}
};
let conf = this.normalizeSubscribe(req, subscriptionId, processData, handleError);
this.subscriptions.set(subscriptionId, results);
// When .subscribe() is called with the same subscription id and a
// different method or parameters, we update the subscription and send
// the new request to the server. However, if the method and parameters
// are exactly the same as the last call, we simply return the cached
// subscription without sending anything to the server.
const request = this.requests.get(conf.subscriptionId);
if (!this.isOpen() || !request || request.method !== conf.method || !_.isEqual(request.params, conf.params)) {
this.connectAndSend(conf);
}
return results;
};
/**
* This method takes a number of subscription ids and cancels them on the
* server and discards them from our internal data structures.
*/
unsubscribe = (...params) => {
for (let subscriptionId of params) {
let subscription = this.subscriptions.get(subscriptionId);
if (subscription) {
subscription.complete();
}
this.requests.delete(subscriptionId);
this.subscriptions.delete(subscriptionId);
}
if (this.isOpen() && params.length) {
let message = { action: 'unsubscribe', client: params };
if (this.showDebugging) {
console.log('sending WebSocket RPC unsubscribe action', message);
}
this.ws.send(JSON.stringify(message));
}
};
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment