<!doctype html>
<html>
<head>
	<meta charset="utf-8" />

	<title>
		Throwing Errors In The Future Using RxJS In Angular 2 Beta 6
	</title>
</head>
<body>

	<h1>
		Throwing Errors In The Future Using RxJS In Angular 2 Beta 6
	</h1>

	<!-- Load demo scripts. -->
	<script type="text/javascript" src="../../vendor/angularjs-2-beta/6/es6-shim.min.js"></script>
	<script type="text/javascript" src="../../vendor/angularjs-2-beta/6/Rx.umd.min.js"></script>
	<script type="text/javascript">

		// NOTE: While this demo is not Angular 2 Beta 6 specific, it is using the version
		// of RxJS that ships with the Angular 2 Betas. As such, I am referring to this
		// demo as having an Angular 2 Beta 6 context.

		// First, we're going to setup an interval that logs the current time out every
		// second so we can see where our deferred errors fall in the space-time continuum.
		Rx.Observable
			.interval( 1000 )
			.take( 10 )
			.map(
				function convertIntervalToTimeString( i ) {

					return( new Date().toTimeString() );

				}
			)
			.map(
				function stripTimeZone( timeString ) {

					return( timeString.split( " " ).shift() );

				}
			)
			.subscribe(
				function handleNext( time ) {

					console.log( time );

				}
			)
		;


		// --------------------------------------------------------------------------- //
		// --------------------------------------------------------------------------- //


		// First approach: using the sequence factory function.
		var errorStream = Rx.Observable.create(
			function streamFactory( observer ) {

				var timer = setTimeout(
					function deferError() {

						observer.error( new Error( "ApproachOneError" ) );

					},
					2500
				);

				// Return the teardown method for the stream's unsubscribe action.
				return(
					function teardown() {

						clearTimeout( timer );

					}
				);

			}
		);

		// NOTE: Stream is "Cold," meaning it won't start emitting events until it
		// has a subscriber; and, each subscriber receives a unique set of events.
		errorStream.subscribe(
			function handleNext( value ) {

				console.log( "Approach One - next:", value );

			},
			function handleError( error ) {

				console.error( "Approach One - error:", error );

			},
			function handleComplete() {

				console.log( "Approach One - complete!" );

			}
		);


		// --------------------------------------------------------------------------- //
		// --------------------------------------------------------------------------- //


		// Second approach: mapping timer onto an error stream.
		var errorStream = Rx.Observable
			.timer( 3500 )
			.flatMap(
				function mapTimerToStream() {

					return( Rx.Observable.throw( new Error( "ApproachTwoError" ) ) );

				}
			)
		;

		// NOTE: Stream is "Cold," meaning it won't start emitting events until it
		// has a subscriber; and, each subscriber receives a unique set of events.
		errorStream.subscribe(
			function handleNext( value ) {

				console.log( "Approach Two - next:", value );

			},
			function handleError( error ) {

				console.error( "Approach Two - error:", error );

			},
			function handleComplete() {

				console.log( "Approach Two - complete!" );

			}
		);


		// --------------------------------------------------------------------------- //
		// --------------------------------------------------------------------------- //


		// Approach three: Explicitly throwing an error during mapping.
		var errorStream = Rx.Observable
			.timer( 4500 )
			.map(
				function mapTimerToValue( i ) {

					throw( new Error( "ApproachThreeError" ) );

				}
			)
		;

		// NOTE: Stream is "Cold," meaning it won't start emitting events until it
		// has a subscriber; and, each subscriber receives a unique set of events.
		errorStream.subscribe(
			function handleNext( value ) {

				console.log( "Approach Three - next:", value );

			},
			function handleError( error ) {

				console.error( "Approach Three - error:", error );

			},
			function handleComplete() {

				console.log( "Approach Three - complete!" );

			}
		);


		// --------------------------------------------------------------------------- //
		// --------------------------------------------------------------------------- //


		// Approach four: Implementing a custom stream creation method. Here, we're
		// monkey-patching the core Rx.Observable library to expose a .throwLater()
		// function which will take an error and a dueTime duration. This basically
		// uses the same approach as above but encapsulates the implementation.
		Rx.Observable.throwLater = function( error, dueTime ) {

			var stream = Rx.Observable
				.timer( dueTime )
				.flatMap(
					function mapTimerToStream() {

						return( Rx.Observable.throw( error ) );

					}
				)
			;

			return( stream );

		};

		// Consume the newly-exposed method to throw a future error.
		var errorStream = Rx.Observable
			.throwLater( new Error( "ApproachFourError" ), 5500 )
		;

		// NOTE: Stream is "Cold," meaning it won't start emitting events until it
		// has a subscriber; and, each subscriber receives a unique set of events.
		errorStream.subscribe(
			function handleNext( value ) {

				console.log( "Approach Four - next:", value );

			},
			function handleError( error ) {

				console.error( "Approach Four - error:", error );

			},
			function handleComplete() {

				console.log( "Approach Four - complete!" );

			}
		);


		// --------------------------------------------------------------------------- //
		// --------------------------------------------------------------------------- //


		// Approach five: concatenating two streams.
		var errorStream = Rx.Observable
			.timer( 6500 )
			.concat( Rx.Observable.throw( new Error( "ApproachFiveError" ) ) )

			// At this point, the stream will actually contains two values - the timer,
			// which would trigger the .handleNext() callback, and the error value,
			// which would trigger the .handleError() callback. We ONLY CARE about the
			// error, so we're going to buffer values and just consume the last one,
			// which is the error.
			// --
			// NOTE: We're not actually "waiting" until the last value - the error will
			// be thrown the moment it is encountered. So, in this case, the concept of
			// "last" is really just "not first."
			.last()
		;

		// NOTE: Stream is "Cold," meaning it won't start emitting events until it
		// has a subscriber; and, each subscriber receives a unique set of events.
		errorStream.subscribe(
			function handleNext( value ) {

				console.log( "Approach Five - next:", value );

			},
			function handleError( error ) {

				console.error( "Approach Five - error:", error );

			},
			function handleComplete() {

				console.log( "Approach Five - complete!" );

			}
		);


		// --------------------------------------------------------------------------- //
		// --------------------------------------------------------------------------- //


		// Approach six: concatenating two streams.
		var errorStream = Rx.Observable
			.timer( 7500 )
			.concat( Rx.Observable.throw( new Error( "ApproachSixError" ) ) )

			// In this case, we're basically using the same approach as the .last()
			// approach in the previous version. This time, however, we're relying on
			// the fact that errors are greedily consumed in a buffer. So, while our
			// error is in a buffer that already has a value (the timer index), the
			// error, when encountered, will supersede the existing buffer content.
			.bufferCount( 2 )
		;

		// NOTE: Stream is "Cold," meaning it won't start emitting events until it
		// has a subscriber; and, each subscriber receives a unique set of events.
		errorStream.subscribe(
			function handleNext( value ) {

				console.log( "Approach Six - next:", value );

			},
			function handleError( error ) {

				console.error( "Approach Six - error:", error );

			},
			function handleComplete() {

				console.log( "Approach Six - complete!" );

			}
		);


		// --------------------------------------------------------------------------- //
		// --------------------------------------------------------------------------- //


		// Approach seven: Creating a HOT stream.
		(function isolateVariables() {

			// This approach is no different than one above. Only, this time, we're
			// creating a "Hot" stream instead of a "Cold" stream. This means that the
			// stream will start emitting events before anyone has subscribed to it.
			// Essentially, this means our timer will start immediately.
			var errorStream = Rx.Observable
				.timer( 8500 )
				.concat( Rx.Observable.throw( new Error( "ApproachSevenError" ) ) )
				.last()

				// This creates a common "proxy" subscriber that will emit the same
				// event instances to all subscribers.
				.publish()
			;

			// This connects the common proxy subscriber to the underlying stream,
			// basically telling the underlying stream to start emitting events. The
			// stream is now "Hot" and is producing events that future subscribers
			// may never see (depending on when they subscribe).
			errorStream.connect();

			// To make sure that the underlying stream is actually producing events
			// before we subscribe to it, I'm not actually going to subscribe to the
			// proxy until a few milliseconds before the error is thrown. If the
			// underlying stream is already producing events, the error will be thrown
			// shortly after this.
			setTimeout(
				function setupSubscription() {

					console.info( "Just subscribed approach seven to HOT stream." );
					errorStream.subscribe( observer );

				},
				8300 // Error will be thrown 200ms later.
			);

			var observer = Rx.Subscriber.create(
				function handleNext( value ) {

					console.log( "Approach Seven - next:", value );

				},
				function handleError( error ) {

					console.error( "Approach Seven - error:", error );

				},
				function handleComplete() {

					console.log( "Approach Seven - complete!" );

				}
			);

		})();


		// --------------------------------------------------------------------------- //
		// --------------------------------------------------------------------------- //


		// Approach eight: Creating a custom operator. In this approach, we're creating
		// a custom operator that exposes a .throw() operator on an existing stream.
		// --
		// CAUTION: I don't really understand this just yet - I'm basically copying the
		// workflow outlined here:
		// https://xgrommx.github.io/rx-book/content/guidelines/implementations/index.html
		Rx.Observable.prototype.throw = function( error ) {

			return( Rx.Observable.create( streamFactory.bind( this ) ) );

			function streamFactory( observer ) {

				// Any stream my emit zero or more "values" followed by a "complete"
				// event. As such, we have to watch for two different use-cases: a
				// populated set and an empty set. In cases where we have an empty
				// set, we need to throw the error in the "complete" callback rather
				// than the "next" callback.
				var errorThrown = false;

				var teardown = this.subscribe(
					function handleNext( value ) {

						errorThrown = true;
						observer.error( error );

					},
					observer.error.bind( observer ),
					function hanldeComplete() {

						if ( ! errorThrown ) {

							observer.error( error );

						}

					}
				);

				return( teardown );

			}

		};

		// To me, this is the most natural way to think about this.
		var errorStream = Rx.Observable
			.timer( 9500 )
			.throw( new Error( "ApproachEightError" ) )
		;

		// NOTE: Stream is "Cold," meaning it won't start emitting events until it
		// has a subscriber; and, each subscriber receives a unique set of events.
		errorStream.subscribe(
			function handleNext( value ) {

				console.log( "Approach Eight - next:", value );

			},
			function handleError( error ) {

				console.error( "Approach Eight - error:", error );

			},
			function handleComplete() {

				console.log( "Approach Eight - complete!" );

			}
		);


		// --------------------------------------------------------------------------- //
		// --------------------------------------------------------------------------- //


		// At this point, all of the event streams have been bound and will emit error
		// events at some point in the near future.
		console.log( "Main event loop has finished executing." );

	</script>

</body>
</html>