Skip to content

Instantly share code, notes, and snippets.

@jashmenn
Created August 21, 2015 00:24
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jashmenn/d8f5cbf5fc20640bac30 to your computer and use it in GitHub Desktop.
Save jashmenn/d8f5cbf5fc20640bac30 to your computer and use it in GitHub Desktop.
RxJS Support for Angular 2 Async Pipes - (angular-2.0.0.alpha-35 and above)
/// <reference path="../../typings/app.d.ts" />
//
// Creates a pipe suitable for a RxJS observable:
//
// @View({
// template: '{{ someObservable | rx}}'
// pipes: [RxPipe]
// })
//
// Originally written by @gdi2290 but updated for 2.0.0.alpha-35 and use AsyncPipe
// (Soon the Angular team will be using RxJS natively and this pipe will be
// unnecessary because we'll be able to use the `async` pipe.)
//
// References:
// * rxPipeRegistry.ts https://gist.github.com/gdi2290/e9b2880a1d13057197d7 by @gdi2290
// * AsyncPipe https://github.com/angular/angular/blob/master/modules/angular2/src/pipes/async_pipe.ts
import {PipeFactory, Pipe, Injectable, bind, ChangeDetectorRef} from "angular2/angular2";
import {AsyncPipe} from "angular2/pipes";
import * as Rx from 'rx';
import {Observable} from 'rx';
function isObservable(obs) {
return obs && typeof obs.subscribe === 'function';
}
class RxStrategy {
createSubscription(async: any, updateLatestValue: any): any {
return async.subscribe(updateLatestValue, e => { throw e; });
}
dispose(subscription: any): void { subscription.dispose(); }
onDestroy(subscription: any): void { subscription.dispose(); }
}
var _rxStrategy = new RxStrategy();
@Pipe({name: 'rx'})
export class RxPipe extends AsyncPipe {
constructor(public _ref: ChangeDetectorRef) { super(_ref); }
supports(obs) { return isObservable(obs); }
_selectStrategy(obj: Observable<any>): any {
return _rxStrategy;
}
}
export var rxPipeInjectables: Array<any> = [
bind(RxPipe).toValue(RxPipe)
];
@robwormald
Copy link

nice one jashmenn, most helpful!

@endash
Copy link

endash commented Sep 28, 2015

To get this to work in alpha37, change line 39 to

    constructor(@Inject(ChangeDetectorRef) public _ref: ChangeDetectorRef) { super(_ref); }

(After importing Inject). The manual bind().toValue() no longer seem necessary.

_Edit_ The above will stop the errors but it doesn't actually work (necessarily... it could still work as a side-effect). The change detection doesn't automatically kick-off again, after the Observable fires. What I've done, which could be cleaned up a bit:

  • Import NgZone, and Rx.Scheduler

  • Inject NgZone into the pipe. Pass it to the strategy, either in the constructor or by setting a property

  • Change line 29 to

        return async.observeOn(Scheduler.timeout).subscribe((val) => { this._zone.run(() => { updateLatestValue(val); }); }, (e: Error) => { throw e; });
    

_Note_ The combined use of observeOn/NgZone is to regularize behaviour regardless of whether an observable is asynchronous or synchronous. Otherwise, coding in expectation of a synchronous observable and in fact receiving an asynchronous observable, or vice-versa, can result in anomalous behaviour.


Here's a working pipe for .38, (using rx 4.0): https://gist.github.com/endash/776938fad064a7fd88de

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment