Skip to content

Instantly share code, notes, and snippets.

@endash
Last active October 13, 2015 20:27
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save endash/0aefb287c6901c86ac6a to your computer and use it in GitHub Desktop.
Save endash/0aefb287c6901c86ac6a to your computer and use it in GitHub Desktop.
RxJS Observable pipe for Angular 2 Alpha 0.40
/**
* Creates a pipe suitable for a RxJS observable:
*
* @View({
* template: '{{ someObservable | rx}}'
* pipes: [RxPipe]
* })
*
* Originally written by @gdi2290 but updated for 2.0.0.alpha-35 and use AsyncPipe
* (Soon the Angular team will be using RxJS natively and this pipe will be
* unnecessary because we'll be able to use the `async` pipe.)
*
* References:
* * rxPipeRegistry.ts https://gist.github.com/gdi2290/e9b2880a1d13057197d7 by @gdi2290
* * AsyncPipe https://github.com/angular/angular/blob/master/modules/angular2/src/pipes/async_pipe.ts
*
* @class
*/
import {Pipe, bind, ChangeDetectorRef, Inject, Injectable, NgZone, AsyncPipe} from "angular2/angular2";
import {Observable, Scheduler} from "rx";
function isObservable(obs: any): boolean {
return obs && typeof obs.subscribe === "function";
}
class RxStrategy {
constructor(public _zone: NgZone) {}
createSubscription(async: any, updateLatestValue: any): any {
return async.observeOn(Scheduler.async).subscribe((val) => {
this._zone.run(() => {
updateLatestValue(val);
});
}, (e: Error) => { throw e; });
}
dispose(subscription: any): void { subscription.dispose(); }
onDestroy(subscription: any): void { subscription.dispose(); }
}
@Pipe({name: "rx", pure: false})
export class RxPipe extends AsyncPipe {
constructor(public _ref: ChangeDetectorRef, public _zone: NgZone) { super(_ref); }
supports(obs: any): boolean { return isObservable(obs); }
_selectStrategy(obj: Observable<any>): any {
return new RxStrategy(this._zone);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment