Skip to content

Instantly share code, notes, and snippets.

@mariano-aguero
Forked from GFoley83/message-bus.service.ts
Created November 16, 2017 18:23
Show Gist options
  • Save mariano-aguero/7b24819c9b898cbe3b6115f2c6f59f9a to your computer and use it in GitHub Desktop.
Save mariano-aguero/7b24819c9b898cbe3b6115f2c6f59f9a to your computer and use it in GitHub Desktop.
Angular 2 Message Bus / PubSub ex.
import { Injectable } from "@angular/core";
import { ReplaySubject, Observable } from "rxjs/Rx";
interface Message {
channel: string;
data: any;
}
@Injectable()
export class MessageBus {
public channelSubjects: { channel: string, subject: ReplaySubject<Message> }[];
constructor() {
this.channelSubjects = [];
}
private getTypeSub(channel: string) {
let existingChannelSubject = this.channelSubjects.find(x => x.channel === channel);
if (existingChannelSubject == null) {
existingChannelSubject = { channel: channel, subject: new ReplaySubject<Message>(1) };
this.channelSubjects.push(existingChannelSubject);
}
return existingChannelSubject;
}
public publish<T>(message: T): void {
const channel = Array.isArray(message) ? (<any>message[0].constructor).name + '[]' : (<any>message.constructor).name
const existingTypeSub = this.getTypeSub(channel);
existingTypeSub.subject.next({ channel: channel, data: message });
}
public listenFor<T>(messageType: { new (...args: any[]): T }): Observable<T>
public listenFor<T>(messageType: { new (...args: any[]): T }[]): Observable<T[]>
public listenFor<T>(messageType: { new (...args: any[]): T } | { new (...args: any[]): T }[]): Observable<T> | Observable<T[]> {
const channel = Array.isArray(messageType) ? (<any>messageType[0]).name + '[]' : (<any>messageType).name
const existingTypeSub = this.getTypeSub(channel);
return existingTypeSub.subject.map(m => m.data);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment