<!doctype html> <html> <head> <meta charset="utf-8" /> <title> Experimenting With The .catch() Operator And Stream Continuation In RxJS And Angular 2 </title> <link rel="stylesheet" type="text/css" href="./demo.css"></link> </head> <body> <h1> Experimenting With The .catch() Operator And Stream Continuation In RxJS And Angular 2 </h1> <p> <a href="./?1">Stream 1</a>: Hot, interval, custom.<br /> <a href="./?2">Stream 2</a>: Hot, interval, publish.<br /> <a href="./?3">Stream 3</a>: Hot, interval, refCount.<br /> <a href="./?4">Stream 4</a>: Cold, interval.<br /> </p> <!-- Load demo scripts. --> <script type="text/javascript" src="../../vendor/angularjs-2-beta/8/es6-shim.min.js"></script> <script type="text/javascript" src="../../vendor/angularjs-2-beta/8/Rx.umd.min.js"></script> <script type="text/javascript"> // --------------------------------------------------------------------------- // // NOTE: This demo really has nothing to do with Angular 2. However, I am // learning RxJS in the context of learning Angular 2 (currently in Beta 8), // so I think mentioning it is goo framing for my mindset and use-cases. // --------------------------------------------------------------------------- // var which = ( +location.search.slice( -1 ) || 1 ); console.info( "Running with stream:", which ); // When coming from the world of Promise chains, it can be very confusing to // think about how errors are handled in RxJS streams. Where as in Promise // chains, you can "transform" an error (ie, a rejection) into a resolution // if you want to, the same cannot be said exactly of RxJS streams. In RxJS, // an error can be transformed at the STREAM LEVEL but not necessarily at the // VALUE LEVEL. As such, we're going to experiment with how .catch() interacts // with different source streams. switch ( which ) { // In this case, we're going to manually create a HOT stream that will // keep emitting values every second until the end of time. case 1: var eventSubject = new Rx.Subject(); var source = Rx.Observable.from( eventSubject ); var i = 0; setInterval( function triggerEvent() { eventSubject.next( i++ ); }, 1000 ); break; // In this case, we're going to implicitly create a HOT stream through the // .publish() operator. case 2: var source = Rx.Observable.interval( 1000 ) .publish() ; source.connect(); // Drop it like it's hot! break; // In this case, we're going to implicit create a HOT stream; but, this time, // we're not necessarily going to let it run forever - we're using the // .refCount() operator to ensure that the underlying source is disconnected // when the published source loses all of its subscriptions. case 3: var source = Rx.Observable.interval( 1000 ) .publish() .refCount() ; // Uncomment this to demonstrate how .refCount() will keep HOT stream // open after .catch() fires IF there is an active subscriber. // -- // source.subscribe( function noop() {} ); break; // In this case, we're going to create a COLD stream that will emit a value // every second to each one if its subscribers. As a COLD stream, each // subscriber will get its own unique set of values, starting with 0. case 4: var source = Rx.Observable.interval( 1000 ); break; } // --------------------------------------------------------------------------- // // --------------------------------------------------------------------------- // // Using the defined source, let's configure our observable stream transformation. var stream = source .do( function logStart( value ) { console.log( "- - - - - - - - - -" ); console.log( "Value:", value ); } ) .map( function transformValue( value ) { return( value + 0.1 ); } ) // CAUTION: The flatMap() operator will occasionally throw an error due to // the MOD logic we are using internally. .flatMap( sendToServer ) // When we catch the error thrown by the .flatMap() operator (or any uncaught // error higher up in the stream), we need to respond by returning another // STREAM that the subscriber will switch over to. .catch( function handleError( error ) { console.warn( "Error caught, re-routing back to source stream." ); // In this case, when the stream fails, we're going to catch that // failure and re-route the subscription back to the SAME STREAM. The // behavior of this re-route depends heavily on the nature of the // stream and whether or not it is HOT or COLD. // -- // NOTE: Despite the fact that we are re-routing to the same stream, // the observer is still being unsubscribed from the stream and then // "re" subscribed to it. You can see this very clearly with COLD // streams as they "restart" upon catch. return( stream ); } ) // NOTE: As it turns out, the .retry() operator, when the retry-count is // omitted, does the same thing - it just re-subscribes to the source // stream an indefinite number of times. // -- // .retry() .finally( function handleFinally() { console.warn( "Stream finished." ); } ) ; // Subscribe to the stream. var subscription = stream.subscribe( function handleValue( value ) { console.log( "Subscribe:", value ); }, function handleError( error ) { console.error( "Subscribe:", error ); } ); // --------------------------------------------------------------------------- // // --------------------------------------------------------------------------- // // I simulate sending data to the server. And, I simulate network FAILURE every // fourth (or so) invocation. Returns a stream in either case. function sendToServer( value ) { // Simulate successful send. if ( ( value < 1 ) || ( Math.floor( value ) % 4 ) ) { console.log( "Sent to server:", value ); return( Rx.Observable.of( value ) ); // Simulate failed send. } else { console.warn( "HTTP error." ); return( Rx.Observable.throw( new Error( "Network Error" ) ) ); } } </script> </body> </html>