Skip to content

Instantly share code, notes, and snippets.

@endash
Last active November 3, 2015 16:53
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save endash/776938fad064a7fd88de to your computer and use it in GitHub Desktop.
Save endash/776938fad064a7fd88de to your computer and use it in GitHub Desktop.
Angular2 Alpha39 rx async pipe: based on using rxjs 4.0. NOTE: removing the `observeOn` will work if an observable resolves asynchronously, but will break for any observer that resolves synchronously.
/**
* Creates a pipe suitable for a RxJS observable:
*
* @View({
* template: '{{ someObservable | rx}}'
* pipes: [RxPipe]
* })
*
* Originally written by @gdi2290 but updated for 2.0.0.alpha-35 and use AsyncPipe
* (Soon the Angular team will be using RxJS natively and this pipe will be
* unnecessary because we'll be able to use the `async` pipe.)
*
* References:
* * rxPipeRegistry.ts https://gist.github.com/gdi2290/e9b2880a1d13057197d7 by @gdi2290
* * AsyncPipe https://github.com/angular/angular/blob/master/modules/angular2/src/pipes/async_pipe.ts
*
* @class
*/
import {Pipe, bind, ChangeDetectorRef, Inject, Injectable, NgZone, AsyncPipe} from "angular2/angular2";
import {Observable, Scheduler} from "rx";
function isObservable(obs: any): boolean {
return obs && typeof obs.subscribe === "function";
}
class RxStrategy {
constructor(public _zone: NgZone) {}
createSubscription(async: any, updateLatestValue: any): any {
return async.observeOn(Scheduler.async).subscribe((val) => { this._zone.run(() => { updateLatestValue(val); }); }, (e: Error) => { throw e; });
}
dispose(subscription: any): void { subscription.dispose(); }
onDestroy(subscription: any): void { subscription.dispose(); }
}
@Pipe({name: "rx", pure: false})
export class RxPipe extends AsyncPipe {
constructor(public _ref: ChangeDetectorRef, public _zone: NgZone) { super(_ref); }
supports(obs: any): boolean { return isObservable(obs); }
_selectStrategy(obj: Observable<any>): any {
return new RxStrategy(this._zone);
}
}
@Pipe({
name: 'get'
})
export class GetPipe {
transform(value, args) { if (value) { return value[args[0]]; } }
}
@Xegyn
Copy link

Xegyn commented Nov 3, 2015

On alpha 45 I was getting the following on line 29:
Property 'async' does not exist on type '{ nextTick: NextTickScheduler; immediate: ImmediateScheduler; }'.

I removed the .observeOn which fixed the TypeScript errors and the pipe was working correctly in the browser, but I was getting the following console error: LifeCycle.tick is called recursively

Removing the this._zone.run removed the console error.

Not sure what my change implicates but changing line 29 to

return async.subscribe((val) => { updateLatestValue(val); }, (e: Error) => { throw e; }); seems to work.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment