<!doctype html> <html> <head> <meta charset="utf-8" /> <title> Throwing Errors In The Future Using RxJS In Angular 2 Beta 6 </title> </head> <body> <h1> Throwing Errors In The Future Using RxJS In Angular 2 Beta 6 </h1> <!-- Load demo scripts. --> <script type="text/javascript" src="../../vendor/angularjs-2-beta/6/es6-shim.min.js"></script> <script type="text/javascript" src="../../vendor/angularjs-2-beta/6/Rx.umd.min.js"></script> <script type="text/javascript"> // NOTE: While this demo is not Angular 2 Beta 6 specific, it is using the version // of RxJS that ships with the Angular 2 Betas. As such, I am referring to this // demo as having an Angular 2 Beta 6 context. // First, we're going to setup an interval that logs the current time out every // second so we can see where our deferred errors fall in the space-time continuum. Rx.Observable .interval( 1000 ) .take( 10 ) .map( function convertIntervalToTimeString( i ) { return( new Date().toTimeString() ); } ) .map( function stripTimeZone( timeString ) { return( timeString.split( " " ).shift() ); } ) .subscribe( function handleNext( time ) { console.log( time ); } ) ; // --------------------------------------------------------------------------- // // --------------------------------------------------------------------------- // // First approach: using the sequence factory function. var errorStream = Rx.Observable.create( function streamFactory( observer ) { var timer = setTimeout( function deferError() { observer.error( new Error( "ApproachOneError" ) ); }, 2500 ); // Return the teardown method for the stream's unsubscribe action. return( function teardown() { clearTimeout( timer ); } ); } ); // NOTE: Stream is "Cold," meaning it won't start emitting events until it // has a subscriber; and, each subscriber receives a unique set of events. errorStream.subscribe( function handleNext( value ) { console.log( "Approach One - next:", value ); }, function handleError( error ) { console.error( "Approach One - error:", error ); }, function handleComplete() { console.log( "Approach One - complete!" ); } ); // --------------------------------------------------------------------------- // // --------------------------------------------------------------------------- // // Second approach: mapping timer onto an error stream. var errorStream = Rx.Observable .timer( 3500 ) .flatMap( function mapTimerToStream() { return( Rx.Observable.throw( new Error( "ApproachTwoError" ) ) ); } ) ; // NOTE: Stream is "Cold," meaning it won't start emitting events until it // has a subscriber; and, each subscriber receives a unique set of events. errorStream.subscribe( function handleNext( value ) { console.log( "Approach Two - next:", value ); }, function handleError( error ) { console.error( "Approach Two - error:", error ); }, function handleComplete() { console.log( "Approach Two - complete!" ); } ); // --------------------------------------------------------------------------- // // --------------------------------------------------------------------------- // // Approach three: Explicitly throwing an error during mapping. var errorStream = Rx.Observable .timer( 4500 ) .map( function mapTimerToValue( i ) { throw( new Error( "ApproachThreeError" ) ); } ) ; // NOTE: Stream is "Cold," meaning it won't start emitting events until it // has a subscriber; and, each subscriber receives a unique set of events. errorStream.subscribe( function handleNext( value ) { console.log( "Approach Three - next:", value ); }, function handleError( error ) { console.error( "Approach Three - error:", error ); }, function handleComplete() { console.log( "Approach Three - complete!" ); } ); // --------------------------------------------------------------------------- // // --------------------------------------------------------------------------- // // Approach four: Implementing a custom stream creation method. Here, we're // monkey-patching the core Rx.Observable library to expose a .throwLater() // function which will take an error and a dueTime duration. This basically // uses the same approach as above but encapsulates the implementation. Rx.Observable.throwLater = function( error, dueTime ) { var stream = Rx.Observable .timer( dueTime ) .flatMap( function mapTimerToStream() { return( Rx.Observable.throw( error ) ); } ) ; return( stream ); }; // Consume the newly-exposed method to throw a future error. var errorStream = Rx.Observable .throwLater( new Error( "ApproachFourError" ), 5500 ) ; // NOTE: Stream is "Cold," meaning it won't start emitting events until it // has a subscriber; and, each subscriber receives a unique set of events. errorStream.subscribe( function handleNext( value ) { console.log( "Approach Four - next:", value ); }, function handleError( error ) { console.error( "Approach Four - error:", error ); }, function handleComplete() { console.log( "Approach Four - complete!" ); } ); // --------------------------------------------------------------------------- // // --------------------------------------------------------------------------- // // Approach five: concatenating two streams. var errorStream = Rx.Observable .timer( 6500 ) .concat( Rx.Observable.throw( new Error( "ApproachFiveError" ) ) ) // At this point, the stream will actually contains two values - the timer, // which would trigger the .handleNext() callback, and the error value, // which would trigger the .handleError() callback. We ONLY CARE about the // error, so we're going to buffer values and just consume the last one, // which is the error. // -- // NOTE: We're not actually "waiting" until the last value - the error will // be thrown the moment it is encountered. So, in this case, the concept of // "last" is really just "not first." .last() ; // NOTE: Stream is "Cold," meaning it won't start emitting events until it // has a subscriber; and, each subscriber receives a unique set of events. errorStream.subscribe( function handleNext( value ) { console.log( "Approach Five - next:", value ); }, function handleError( error ) { console.error( "Approach Five - error:", error ); }, function handleComplete() { console.log( "Approach Five - complete!" ); } ); // --------------------------------------------------------------------------- // // --------------------------------------------------------------------------- // // Approach six: concatenating two streams. var errorStream = Rx.Observable .timer( 7500 ) .concat( Rx.Observable.throw( new Error( "ApproachSixError" ) ) ) // In this case, we're basically using the same approach as the .last() // approach in the previous version. This time, however, we're relying on // the fact that errors are greedily consumed in a buffer. So, while our // error is in a buffer that already has a value (the timer index), the // error, when encountered, will supersede the existing buffer content. .bufferCount( 2 ) ; // NOTE: Stream is "Cold," meaning it won't start emitting events until it // has a subscriber; and, each subscriber receives a unique set of events. errorStream.subscribe( function handleNext( value ) { console.log( "Approach Six - next:", value ); }, function handleError( error ) { console.error( "Approach Six - error:", error ); }, function handleComplete() { console.log( "Approach Six - complete!" ); } ); // --------------------------------------------------------------------------- // // --------------------------------------------------------------------------- // // Approach seven: Creating a HOT stream. (function isolateVariables() { // This approach is no different than one above. Only, this time, we're // creating a "Hot" stream instead of a "Cold" stream. This means that the // stream will start emitting events before anyone has subscribed to it. // Essentially, this means our timer will start immediately. var errorStream = Rx.Observable .timer( 8500 ) .concat( Rx.Observable.throw( new Error( "ApproachSevenError" ) ) ) .last() // This creates a common "proxy" subscriber that will emit the same // event instances to all subscribers. .publish() ; // This connects the common proxy subscriber to the underlying stream, // basically telling the underlying stream to start emitting events. The // stream is now "Hot" and is producing events that future subscribers // may never see (depending on when they subscribe). errorStream.connect(); // To make sure that the underlying stream is actually producing events // before we subscribe to it, I'm not actually going to subscribe to the // proxy until a few milliseconds before the error is thrown. If the // underlying stream is already producing events, the error will be thrown // shortly after this. setTimeout( function setupSubscription() { console.info( "Just subscribed approach seven to HOT stream." ); errorStream.subscribe( observer ); }, 8300 // Error will be thrown 200ms later. ); var observer = Rx.Subscriber.create( function handleNext( value ) { console.log( "Approach Seven - next:", value ); }, function handleError( error ) { console.error( "Approach Seven - error:", error ); }, function handleComplete() { console.log( "Approach Seven - complete!" ); } ); })(); // --------------------------------------------------------------------------- // // --------------------------------------------------------------------------- // // Approach eight: Creating a custom operator. In this approach, we're creating // a custom operator that exposes a .throw() operator on an existing stream. // -- // CAUTION: I don't really understand this just yet - I'm basically copying the // workflow outlined here: // https://xgrommx.github.io/rx-book/content/guidelines/implementations/index.html Rx.Observable.prototype.throw = function( error ) { return( Rx.Observable.create( streamFactory.bind( this ) ) ); function streamFactory( observer ) { // Any stream my emit zero or more "values" followed by a "complete" // event. As such, we have to watch for two different use-cases: a // populated set and an empty set. In cases where we have an empty // set, we need to throw the error in the "complete" callback rather // than the "next" callback. var errorThrown = false; var teardown = this.subscribe( function handleNext( value ) { errorThrown = true; observer.error( error ); }, observer.error.bind( observer ), function hanldeComplete() { if ( ! errorThrown ) { observer.error( error ); } } ); return( teardown ); } }; // To me, this is the most natural way to think about this. var errorStream = Rx.Observable .timer( 9500 ) .throw( new Error( "ApproachEightError" ) ) ; // NOTE: Stream is "Cold," meaning it won't start emitting events until it // has a subscriber; and, each subscriber receives a unique set of events. errorStream.subscribe( function handleNext( value ) { console.log( "Approach Eight - next:", value ); }, function handleError( error ) { console.error( "Approach Eight - error:", error ); }, function handleComplete() { console.log( "Approach Eight - complete!" ); } ); // --------------------------------------------------------------------------- // // --------------------------------------------------------------------------- // // At this point, all of the event streams have been bound and will emit error // events at some point in the near future. console.log( "Main event loop has finished executing." ); </script> </body> </html>