<!doctype html>
<html>
<head>
	<meta charset="utf-8" />

	<title>
		Experimenting With The .catch() Operator And Stream Continuation In RxJS And Angular 2
	</title>

	<link rel="stylesheet" type="text/css" href="./demo.css"></link>
</head>
<body>

	<h1>
		Experimenting With The .catch() Operator And Stream Continuation In RxJS And Angular 2
	</h1>

	<p>
		<a href="./?1">Stream 1</a>: Hot, interval, custom.<br />
		<a href="./?2">Stream 2</a>: Hot, interval, publish.<br />
		<a href="./?3">Stream 3</a>: Hot, interval, refCount.<br />
		<a href="./?4">Stream 4</a>: Cold, interval.<br />
	</p>

	<!-- Load demo scripts. -->
	<script type="text/javascript" src="../../vendor/angularjs-2-beta/8/es6-shim.min.js"></script>
	<script type="text/javascript" src="../../vendor/angularjs-2-beta/8/Rx.umd.min.js"></script>
	<script type="text/javascript">

		// --------------------------------------------------------------------------- //
		// NOTE: This demo really has nothing to do with Angular 2. However, I am
		// learning RxJS in the context of learning Angular 2 (currently in Beta 8),
		// so I think mentioning it is goo framing for my mindset and use-cases.
		// --------------------------------------------------------------------------- //

		var which = ( +location.search.slice( -1 ) || 1 );

		console.info( "Running with stream:", which );

		// When coming from the world of Promise chains, it can be very confusing to
		// think about how errors are handled in RxJS streams. Where as in Promise
		// chains, you can "transform" an error (ie, a rejection) into a resolution
		// if you want to, the same cannot be said exactly of RxJS streams. In RxJS,
		// an error can be transformed at the STREAM LEVEL but not necessarily at the
		// VALUE LEVEL. As such, we're going to experiment with how .catch() interacts
		// with different source streams.
		switch ( which ) {
			// In this case, we're going to manually create a HOT stream that will
			// keep emitting values every second until the end of time.
			case 1:

				var eventSubject = new Rx.Subject();
				var source = Rx.Observable.from( eventSubject );
				var i = 0;

				setInterval(
					function triggerEvent() {

						eventSubject.next( i++ );

					},
					1000
				);

			break;

			// In this case, we're going to implicitly create a HOT stream through the
			// .publish() operator.
			case 2:

				var source = Rx.Observable.interval( 1000 )
					.publish()
				;

				source.connect(); // Drop it like it's hot!

			break;

			// In this case, we're going to implicit create a HOT stream; but, this time,
			// we're not necessarily going to let it run forever - we're using the
			// .refCount() operator to ensure that the underlying source is disconnected
			// when the published source loses all of its subscriptions.
			case 3:

				var source = Rx.Observable.interval( 1000 )
					.publish()
					.refCount()
				;

				// Uncomment this to demonstrate how .refCount() will keep HOT stream
				// open after .catch() fires IF there is an active subscriber.
				// --
				// source.subscribe( function noop() {} );

			break;

			// In this case, we're going to create a COLD stream that will emit a value
			// every second to each one if its subscribers. As a COLD stream, each
			// subscriber will get its own unique set of values, starting with 0.
			case 4:

				var source = Rx.Observable.interval( 1000 );

			break;
		}


		// --------------------------------------------------------------------------- //
		// --------------------------------------------------------------------------- //


		// Using the defined source, let's configure our observable stream transformation.
		var stream = source
			.do(
				function logStart( value ) {

					console.log( "- - - - - - - - - -" );
					console.log( "Value:", value );

				}
			)
			.map(
				function transformValue( value ) {

					return( value + 0.1 );

				}
			)

			// CAUTION: The flatMap() operator will occasionally throw an error due to
			// the MOD logic we are using internally.
			.flatMap( sendToServer )

			// When we catch the error thrown by the .flatMap() operator (or any uncaught
			// error higher up in the stream), we need to respond by returning another
			// STREAM that the subscriber will switch over to.
			.catch(
				function handleError( error ) {

					console.warn( "Error caught, re-routing back to source stream." );

					// In this case, when the stream fails, we're going to catch that
					// failure and re-route the subscription back to the SAME STREAM. The
					// behavior of this re-route depends heavily on the nature of the
					// stream and whether or not it is HOT or COLD.
					// --
					// NOTE: Despite the fact that we are re-routing to the same stream,
					// the observer is still being unsubscribed from the stream and then
					// "re" subscribed to it. You can see this very clearly with COLD
					// streams as they "restart" upon catch.
					return( stream );

				}
			)

			// NOTE: As it turns out, the .retry() operator, when the retry-count is
			// omitted, does the same thing - it just re-subscribes to the source
			// stream an indefinite number of times.
			// --
			// .retry()

			.finally(
				function handleFinally() {

					console.warn( "Stream finished." );

				}
			)
		;


		// Subscribe to the stream.
		var subscription = stream.subscribe(
			function handleValue( value ) {

				console.log( "Subscribe:", value );

			},
			function handleError( error ) {

				console.error( "Subscribe:", error );

			}
		);


		// --------------------------------------------------------------------------- //
		// --------------------------------------------------------------------------- //


		// I simulate sending data to the server. And, I simulate network FAILURE every
		// fourth (or so) invocation. Returns a stream in either case.
		function sendToServer( value ) {

			// Simulate successful send.
			if ( ( value < 1 ) || ( Math.floor( value ) % 4 ) ) {

				console.log( "Sent to server:", value );

				return( Rx.Observable.of( value ) );

			// Simulate failed send.
			} else {

				console.warn( "HTTP error." );

				return( Rx.Observable.throw( new Error( "Network Error" ) ) );

			}

		}

	</script>

</body>
</html>