Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
import {Injectable} from "@angular/core";
import {environment} from "app/../environments/environment";
import {Observable, ReplaySubject, Subject} from "rxjs";
const SockJS = require('sockjs-client');
const Stomp = require('stompjs');
// TODO : Find sockJs / StompJs typings
interface StompClient {
connect(params: any, connectHandler: (frame: Frame) => void, errorHandler: (error: any) => void)
subscribe(topic: string, handler: (value: Frame) => void)
}
interface Frame {
body: string;
headers: Map<string,any>
command: string
}
@Injectable()
export class StompBroker {
stompClient: Subject<StompClient>;
constructor() {
// Use a replay subject, so that when completed, the
// subscribe() continues to return the most recent value
this.stompClient = new ReplaySubject(1);
let router = this;
console.info("Connecting websocket");
let socket = new SockJS(`${environment.serverUrl}/push`);
let _stompClient: StompClient = Stomp.over(socket);
_stompClient.connect({}, frame => {
console.log('Connected: ' + frame);
router.stompClient.next(_stompClient);
}, error => {
console.log('Stomp connection failed: ', error);
});
}
subscribe(topic: string): Observable < any > {
let that = this;
return Observable.create(function (observer) {
console.log(`Subscribing to ${topic}`);
that.stompClient.subscribe({
next: (stompClient: StompClient) => {
stompClient.subscribe(topic, (message => {
console.log("Received message over websocket: ", message);
observer.next(JSON.parse(message.body));
}))
}
});
})
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.