Skip to content

Instantly share code, notes, and snippets.

@GFoley83
Last active June 2, 2020 12:55
Show Gist options
  • Save GFoley83/ac0004796972036b221ffbeb67aa60d9 to your computer and use it in GitHub Desktop.
Save GFoley83/ac0004796972036b221ffbeb67aa60d9 to your computer and use it in GitHub Desktop.
Angular 2 Message Bus / PubSub ex.
import { Injectable } from "@angular/core";
import { ReplaySubject, Observable } from "rxjs/Rx";
interface Message {
channel: string;
data: any;
}
@Injectable()
export class MessageBus {
public channelSubjects: { channel: string, subject: ReplaySubject<Message> }[];
constructor() {
this.channelSubjects = [];
}
private getTypeSub(channel: string) {
let existingChannelSubject = this.channelSubjects.find(x => x.channel === channel);
if (existingChannelSubject == null) {
existingChannelSubject = { channel: channel, subject: new ReplaySubject<Message>(1) };
this.channelSubjects.push(existingChannelSubject);
}
return existingChannelSubject;
}
public publish<T>(message: T): void {
const channel = Array.isArray(message) ? (<any>message[0].constructor).name + '[]' : (<any>message.constructor).name
const existingTypeSub = this.getTypeSub(channel);
existingTypeSub.subject.next({ channel: channel, data: message });
}
public listenFor<T>(messageType: { new (...args: any[]): T }): Observable<T>
public listenFor<T>(messageType: { new (...args: any[]): T }[]): Observable<T[]>
public listenFor<T>(messageType: { new (...args: any[]): T } | { new (...args: any[]): T }[]): Observable<T> | Observable<T[]> {
const channel = Array.isArray(messageType) ? (<any>messageType[0]).name + '[]' : (<any>messageType).name
const existingTypeSub = this.getTypeSub(channel);
return existingTypeSub.subject.map(m => m.data);
}
}
@vlodko
Copy link

vlodko commented Feb 15, 2018

@gioragutt good question. Why would I need ReplaySubject here?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment