Skip to content

Instantly share code, notes, and snippets.

@alfredlopez
Forked from benorama/EventBusService.ts
Last active December 31, 2018 13:46
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save alfredlopez/a41cd9cca52f989b2de444c89ef0f0e2 to your computer and use it in GitHub Desktop.
Save alfredlopez/a41cd9cca52f989b2de444c89ef0f0e2 to your computer and use it in GitHub Desktop.
Typescript version of Vertx EventBus Client for Aurelia
/**
* Based on Vertx EventBus Client (https://github.com/vert-x3/vertx-bus-bower)
* and EventBusService.ts for Angular by Benoit Hediard (@benorama)
* Requires SockJS Client
*/
import {inject} from 'aurelia-framework';
import {EventAggregator} from 'aurelia-event-aggregator';
import * as SockJS from 'sockjs-client';
@inject(EventAggregator)
export class EventBusService {
static initialized: boolean = false;
static MAX_EVENT_QUEUE_SIZE: number = 100;
static STATE_CONNECTING: number = 0;
static STATE_OPEN: number = 1;
static STATE_CLOSING: number = 2;
static STATE_CLOSED: number = 3;
static TYPE_PUBLISH: string = 'publish';
static TYPE_SEND: string = 'send';
static TYPE_REGISTER: string = 'register';
static TYPE_REGISTER_HANDLER: string = 'registerHandler';
static TYPE_UNREGISTER: string = 'unregister';
static TYPE_UNREGISTER_HANDLER: string = 'unregisterHandler';
private defaultHeaders: any;
private eventQueue: QueuedEvent[];
private handlers: any = {};
private replyHandlers: any = {};
private sockJS;
private state: number;
private ea: EventAggregator;
constructor(ea: EventAggregator) {
this.ea = ea;
if (EventBusService.initialized) {
throw new Error('Only one vertx eventBus can exist per application.');
}
EventBusService.initialized = true;
}
get connected(): boolean {
return this.state === EventBusService.STATE_OPEN;
}
/**
* This sets up and connects to the EventBus. Clients of this class can
* subscribe to EvenAggregator events: 'event-bus-opened', 'event-bus-closed',
* 'event-bus-error'
* @param url
* @param defaultHeaders
* @param options
*/
connect(url: string, defaultHeaders: any = null, options: any = {}): void {
let pingInterval = options.vertxbus_ping_interval || 5000;
let pingTimerID;
this.defaultHeaders = defaultHeaders;
this.eventQueue = [];
this.handlers = {};
this.replyHandlers = {};
this.sockJS = new SockJS(url, null, options);
this.state = EventBusService.STATE_CONNECTING;
let sendPing = () => {
this.sockJS.send(JSON.stringify({type: 'ping'}));
};
this.sockJS.onopen = () => {
this.state = EventBusService.STATE_OPEN;
// Send the first ping then send a ping every pingInterval milliseconds
sendPing();
this.flushEventQueue();
pingTimerID = setInterval(sendPing, pingInterval);
// this.open.emit(null); //See note above
this.ea.publish('event-bus-opened');
};
this.sockJS.onclose = (e) => {
this.state = EventBusService.STATE_CLOSED;
if (pingTimerID) clearInterval(pingTimerID);
this.ea.publish('event-bus-closed');
};
this.sockJS.onmessage = (e) => {
let json = JSON.parse(e.data);
// define a reply function on the message itself
if (json.replyAddress) {
Object.defineProperty(json, 'reply', {
value: function (message, headers, callback) {
this.send(json.replyAddress, message, headers, callback);
}
});
}
if (this.handlers[json.address]) {
// iterate all registered handlers
let handlers = this.handlers[json.address];
for (let i = 0; i < handlers.length; i++) {
if (json.type === 'err') {
handlers[i]({
failureCode: json.failureCode,
failureType: json.failureType,
message: json.message
});
} else {
handlers[i](null, json);
}
}
} else if (this.replyHandlers[json.address]) {
// Might be a reply message
let handler = this.replyHandlers[json.address];
delete this.replyHandlers[json.address];
if (json.type === 'err') {
handler({failureCode: json.failureCode, failureType: json.failureType, message: json.message});
} else {
handler(null, json);
}
} else {
if (json.type === 'err') {
try {
console.error(json);
this.ea.publish("event-bus-error", {message:json.body});
} catch (e) {
// dev tools are disabled so we cannot use console on IE
}
} else {
try {
console.warn('No handler found for message: ', json);
} catch (e) {
// dev tools are disabled so we cannot use console on IE
}
}
}
};
}
disconnect() {
if (this.sockJS) {
this.state = EventBusService.STATE_CLOSING;
this.sockJS.close();
}
}
/**
* Publish a message
*
* @param {String} address
* @param {Object} body
* @param {Object} [headers]
*/
publish(address: string,
body: any,
headers?: any) {
if (this.connected) {
let message: any = {
address: address,
body: body,
headers: this.mergeHeaders(this.defaultHeaders, headers),
type: EventBusService.TYPE_PUBLISH
};
this.sockJS.send(JSON.stringify(message));
} else {
this.addEventToQueue({address, body, type: EventBusService.TYPE_PUBLISH});
}
};
/**
* Send a message
*
* @param {String} address
* @param {Object} body
* @param {Function} [replyHandler]
* @param {Object} [headers]
*/
send<T>(address: string,
body: any,
replyHandler?: Function,
headers?: any): void {
if (this.connected) {
let message: any = {
address: address,
body: body,
headers: this.mergeHeaders(this.defaultHeaders, headers),
type: EventBusService.TYPE_SEND
};
if (replyHandler) {
let replyAddress = makeUUID();
message.replyAddress = replyAddress;
this.replyHandlers[replyAddress] = replyHandler;
}
this.sockJS.send(JSON.stringify(message));
} else {
this.addEventToQueue({address, handler: replyHandler, body, type: EventBusService.TYPE_SEND});
}
};
/**
*
* @param address
* @param headers
*/
register<T>(address: string,
headers?: any): void {
if (this.connected) {
let envelope: any = {
address: address,
headers: this.mergeHeaders(this.defaultHeaders, headers),
type: EventBusService.TYPE_REGISTER
};
this.sockJS.send(JSON.stringify(envelope));
}
}
/**
* Register a new handler
*
* @param {String} address
* @param {Function} handler
* @param {Object} [headers]
*/
registerHandler<T>(address: string,
handler: Function,
headers?: any): void {
if (this.connected) {
// ensure it is an array
if (!this.handlers[address]) {
this.handlers[address] = [];
// First handler for this address so we should register the connection
this.register(address, headers);
}
this.handlers[address].push(handler);
} else {
this.addEventToQueue({address, handler, type: EventBusService.TYPE_REGISTER_HANDLER});
}
};
/**
*
* @param address
* @param headers
*/
unregister<T>(address: string,
headers?: any): void {
if (this.connected) {
let envelope: any = {
address: address,
headers: this.mergeHeaders(this.defaultHeaders, headers),
type: EventBusService.TYPE_UNREGISTER
};
this.sockJS.send(JSON.stringify(envelope));
}
delete this.handlers[address];
}
/**
* Unregister a handler
*
* @param {String} address
* @param {Function} handler
* @param {Object} [headers]
*/
unregisterHandler<T>(address: string,
handler: Function,
headers?: any): void {
if (this.connected) {
let handlers = this.handlers[address];
if (handlers) {
let idx = handlers.indexOf(handler);
if (idx != -1) {
handlers.splice(idx, 1);
if (handlers.length === 0) {
// No more local handlers so we should unregister the connection
this.unregister(address, headers);
}
}
}
} else {
this.addEventToQueue({address, handler, type: EventBusService.TYPE_UNREGISTER});
}
};
/**
* I use "protected" because it allows subclasses to reuse these methods while
* keeping them private to applications. This is the proper way of "information hiding":
* ALL methods should be available to *class developers* but not *application developers*.
* Setting a method private, violates the reusability rule.
*/
protected addEventToQueue(event) {
if (!this.eventQueue) {
return;
}
this.eventQueue.push(event);
if (this.eventQueue.length > EventBusService.MAX_EVENT_QUEUE_SIZE) {
// Remove oldest events from the queue
this.eventQueue.splice(0, this.eventQueue.length - EventBusService.MAX_EVENT_QUEUE_SIZE);
}
}
protected flushEventQueue() {
if (!this.connected) {
return;
}
while (this.eventQueue.length > 0) {
let event: QueuedEvent = this.eventQueue.shift();
switch (event.type) {
case EventBusService.TYPE_PUBLISH:
this.publish(event.address, event.body);
break;
case EventBusService.TYPE_REGISTER_HANDLER:
this.registerHandler(event.address, event.handler);
break;
case EventBusService.TYPE_UNREGISTER_HANDLER:
this.unregisterHandler(event.address, event.handler);
break;
}
}
}
protected mergeHeaders(defaultHeaders, headers) {
if (defaultHeaders) {
if (!headers) {
return defaultHeaders;
}
for (let headerName in defaultHeaders) {
if (defaultHeaders.hasOwnProperty(headerName)) {
// user can overwrite the default headers
if (typeof headers[headerName] === 'undefined') {
headers[headerName] = defaultHeaders[headerName];
}
}
}
}
// headers are required to be a object
return headers || {};
}
}
interface QueuedEvent {
address: string;
body: any; // Only for PUBLISH/SEND events
handler: Function; // Only for REGISTER/SEND events
headers: any;
type: string;
}
function makeUUID() {
return 'xxxxxxxx-xxxx-4xxx-yxxx-xxxxxxxxxxxx'.replace(/[xy]/g, function (a, b) {
return b = Math.random() * 16, (a == 'y' ? b & 3 | 8 : b | 0).toString(16);
});
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment