Created
August 15, 2018 07:31
-
-
Save erikvullings/f0830d9381ab3d25ef0908e8c8fb6317 to your computer and use it in GitHub Desktop.
Message bus service, or pub/sub service, loosely based on postal.js, but created in TypeScript.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
export type ICallback<T> = (data: T, envelope: IEnvelope<T>) => void; | |
/** Message enveloppe */ | |
export interface IEnvelope<T> { | |
/* Uses DEFAULT_CHANNEL if no channel is provided */ | |
channel?: string; | |
/** Topic name */ | |
topic: string; | |
/** Payload */ | |
data?: T; | |
} | |
export interface ISubscriptionDefinition<T> { | |
/** Channel name */ | |
channel: string; | |
/** Topic name, which is either a string, a # (will always be called), or a regex pattern. */ | |
topic: string; | |
/** Callback to invoke */ | |
callback: ICallback<T>; | |
unsubscribe(): void; | |
} | |
export interface IChannelDefinition<T> { | |
/** Name of the channel */ | |
channel: string; | |
/** Subscribe to a topic. A topic can be a string, a # (all), or a regex pattern (as string) */ | |
subscribe(topic: string, callback: ICallback<T>): ISubscriptionDefinition<T>; | |
/** Publish to a topic */ | |
publish(topic: string, data?: T): void; | |
} | |
export interface IPubSub { | |
/** | |
* Get the channel, which you can use to subscribe or for publising messages. | |
* @param channelName channel name. | |
*/ | |
channel<T>(channelName?: string): IChannelDefinition<T>; | |
/** | |
* Publish directly to a channel and topic, without getting the channel first. | |
* @param msg The message you want to publish. | |
*/ | |
publish<T>(msg: { channel: string; topic: string; data: T }): void; | |
} | |
/** | |
* Publish/subscribe in-memory message bus, loosely based on postal.js (https://www.npmjs.com/package/postal). | |
* | |
*/ | |
class MessageBusService implements IPubSub { | |
private channels: { [channel: string]: IChannelDefinition<any> } = {}; | |
/** | |
* Create a new channel, or get an existing one. | |
* @param channelName If no channel name is supplied, 'DEFAULT_CHANNEL' will be used. | |
*/ | |
public channel<T>(channelName: string = 'DEFAULT_CHANNEL') { | |
if (!this.channels.hasOwnProperty(channelName)) { | |
const newChannel = this.createChannel<T>(channelName); | |
this.channels[channelName] = newChannel; | |
} | |
return this.channels[channelName] as IChannelDefinition<T>; | |
} | |
/** | |
* Directly publish a message, without first obtaining the channel. | |
* @param msg: Channel | |
*/ | |
public publish<T>({ channel, topic, data }: { channel: string; topic: string; data: T }) { | |
this.channel(channel).publish(topic, data); | |
} | |
private createChannel<T>(channel: string): IChannelDefinition<T> { | |
const topics: { [topic: string]: Array<ICallback<T>> } = {}; | |
const createSubscriber = () => (topic: string, callback: ICallback<T>) => { | |
if (!topics.hasOwnProperty(topic)) { | |
topics[topic] = []; | |
} | |
const listeners = topics[topic]; | |
listeners.push(callback); | |
return { | |
channel, | |
topic, | |
callback, | |
unsubscribe: () => { | |
const callbackIndex = listeners.indexOf(callback); | |
if (callbackIndex > -1) { listeners.splice(callbackIndex, 1); } | |
}, | |
} as ISubscriptionDefinition<T>; | |
}; | |
const createPublisher = () => (topic: string, data: T) => { | |
const byTopic = (key: string) => topic === '#' || topic.startsWith(key) || /key/.test(topic); | |
const toListeners = (key: string) => topics[key]; | |
const envelope = { channel, topic, data }; | |
Object.keys(topics) | |
.filter(byTopic) | |
.map(toListeners) | |
.reduce((p, c) => [ ...p, ...c ], []) | |
.forEach(cb => cb(data, envelope)); | |
}; | |
return { | |
channel, | |
subscribe: createSubscriber(), | |
publish: createPublisher(), | |
}; | |
} | |
// scenarioChannel: new EventEmitter<{ old: Readonly<IScenario>; cur: Readonly<IScenario> }>(), | |
// scenariosChannel: new EventEmitter<{ list: ReadonlyArray<IScenario> }>(), | |
// objectiveChannel: new EventEmitter<{ old: Readonly<IObjective>; cur: Readonly<IObjective> }>(), | |
// objectivesChannel: new EventEmitter<{ list: ReadonlyArray<IObjective> }>(), | |
} | |
export const messageBus = new MessageBusService(); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment