Skip to content

Instantly share code, notes, and snippets.

@jamesmoey
Created March 22, 2016 20:35
Show Gist options
  • Save jamesmoey/4fe35c8f42707c183570 to your computer and use it in GitHub Desktop.
Save jamesmoey/4fe35c8f42707c183570 to your computer and use it in GitHub Desktop.
Rabbit.js as RxJS Observable in Typescript
/// <reference path="../typings/tsd.d.ts" />
/// <reference path="../node_modules/rx/ts/rx.all.d.ts" />
import * as Rx from 'rx';
import * as rabbit from 'rabbit.js';
export module Messaging {
export enum PubSocketType {
PUBLISH = <any>'PUB',
PUSH = <any>'PUSH',
REQUEST = <any>'REQUEST',
}
export enum SubSocketType {
SUBSCRIBE = <any>'SUB',
PULL = <any>'PULL',
REPLY = <any>'REPLY',
WORKER = <any>'WORKER',
}
export enum RoutingType {
FANOUT = <any>'fanout',
DIRECT = <any>'direct',
TOPIC = <any>'topic',
}
interface IOpts {
uri: string,
queue: string,
routing?: RoutingType,
topic: string[],
}
interface IPubOpts extends IOpts {
socketType: PubSocketType,
}
interface ISubOpts extends IOpts {
socketType: SubSocketType,
}
export function subscribe<Buffer>(opts: ISubOpts) {
return Rx.Observable.create<Buffer>((observer: Rx.IObserver<any>) => {
var context: rabbit.Context = rabbit.createContext(opts.uri);
context.on('ready', () => {
console.log('connecting to ', opts.uri);
var socket = context.socket<rabbit.SubSocket>(<any>opts.socketType, { routing: opts.routing });
socket.on('data', chunk => observer.onNext(chunk));
socket.on('error', err => observer.onError(err));
socket.on('end', () => observer.onCompleted());
opts.topic.forEach(topic => socket.connect(opts.queue, topic));
});
return () => {
return context.close(() => {});
}
});
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment