Skip to content

Instantly share code, notes, and snippets.

@james-world
Last active August 29, 2015 14:07
Show Gist options
  • Save james-world/904ca7383a8f1cd349b9 to your computer and use it in GitHub Desktop.
Save james-world/904ca7383a8f1cd349b9 to your computer and use it in GitHub Desktop.
Rx example demonstrating recovering from a stream error

Reproduced from Stack Overflow: http://stackoverflow.com/questions/26421425/zip-or-merge-two-iobservable-sequences-where-one-sequence-can-fail

#Question#

I have a requirement that sees the merging of two IObservable sequences but with the potential for one of these sequences to fail without impacting the observer.

So taking the example from the Intro to Rx book of a sequence of numbers and characters that are zipped together:

nums  --0--1--2|

chars --a--b--c--d--e--f|

result -0--1--2|
      
        a  b  c|
Prints:
0 & a
1 & b
2 & c

How would you go about doing this but allowing for one of these sequences to fail (throw an exception) but without halting the execution of the subscribed observers?

nums  --0--1--2--3|

chars --a--b--X--d--e--f|

result -0--1--2--3|
      
        a  b  X  d|
Prints:
0 & a
1 & b
2 & NO VALUE
3 & d

I'm a bit bamboozled with Catch, Finally, OnErrorResumeNext etc and how I could apply this to my use case. All help greatly appreciated.

#Answer#

Bear in mind the Rx grammar. A sequence is defined as follows:

OnNext* (OnError | OnCompleted)

In other words, a sequence has zero or more OnNext events, followed by an OnError or an OnCompleted. (This ignores the temporal aspect in that it could be an infinitely long stream, so a stream that never completed is still valid).

The key point is that a stream can have at most one error. Once it's sent OnError, no more events can be sent.

Now, it's possible to wrap a badly behaving source stream with something like OnErrorResumeNext - but you have to have a new stream for your wrapper to resume with... the first stream is dead.

Typically, in this situation you will have some kind of underlying hot source of events you want to be able to resume "live" (i.e., you don't want to start over from the first event).

###Example Setup###

I will simulate this scenario. Unfortunately, there's going to be a fair amount of setup - but in reality creating these recoverable streams generally takes a bit of work anyway!

I'll first create a factory function that gets a subscription to a contrived hot source of letters emitted once per second. This will be called to "recover" the subscription to the underlying data.

First we can create an error free letter stream by zipping a timer with an array of letters:

var letters = new [] { "a", "b", "c", "d", "e", "f" };
var letterStream = Observable.Interval(TimeSpan.FromSeconds(1))
                             .Zip(letters, (_, ls) => ls);

Now, we will Publish this to make it hot - subscriptions to the published stream will pick it up from wherever it's got to:

var hotLetterStream = letterStream.Publish();

Now we can create an observable that will, on subscription, subscribe to the live stream and fail if it sees letter "c". This is a little tricky, but don't worry too much here - it's not the main point of the example, we just need a stream that gives us the underlying hot data and fails on a particular value. It exhibits the property of observable streams in that they can only error once.

var failStream =
    hotLetterStream.SelectMany(x =>  x == "c"
        ? Observable.Throw<string>(new Exception()) 
        : Observable.Return(x));  

Now we can set up the number stream - it just returns a zero-based value a second for 4 seconds:

var numberStream = Observable.Interval(TimeSpan.FromSeconds(1)).Take(4);

Now we can combine the streams with Zip - we use Catch to replace the failed letter with the "NO VALUE" single valued stream, and then Repeat to seamlessly concatenate a brand new subscription to the hot source:

var combinedStream = numberStream.Zip(
    failStream.Catch<string, Exception>(ex => Observable.Return("NO VALUE"))
              .Repeat(), (ns,ls) => ns + " & " + ls);

Now we can subscribe to this:

combinedStream.Subscribe(Console.WriteLine);

And finally we must "switch on" the published stream with Connect to start values flowing:

hotLetterStream.Connect();

This code runs as written if you pull in nuget package rx-main and produces the following output:

0 & a
1 & b
2 & NO VALUE
3 & d

###Communicating Errors###

Now in this trivial example, we communicated the error by replacing the letter with the "NO VALUE" string. That's fine for this example, and may work for you. In reality however, dealing with a failed stream like this can result in messy checks all over your code.

Fortunately, there is a clean solution. You want to use the idea of an Error monad. This is natively supported in languages like Haskell, but the idea can be easily adopted in Rx. It works by providing a special container - much like Nullable<T> works in .NET - but instead of holding a value or null, it holds a value or an exception.

It can be though of as a specialization of the more general Either<TLeft, TRight> monad - which has a left and right side. This is implemented by Dave Sexton in his wonderful Rxx extension to Rx. The link here is straight to a discussion of Either). It's easy enough to create your own version too.

So instead of subscribing to IObservable<T> you wrap your T in an IObservable<Either<TException, T>>. If the value is good, send it on with Either.Right - if it's bad use Either.Left to forward the exception (this is the general convention - e.g. good "right" bad "left"). You can even create a an Error type to wrap Either and restrict TLeft to an exception. Operators that care about the value can check the left and right properties, operators that don't just pass on the value oblivious to whether it's an error or not - just as .NET functions can work with Nullable<T> without necessarily caring whether the value is a null or not.

In this way, you can propagate an exception down lengthy observable chains to an ultimate subscriber in a clean and understandable fashion.

Another takeaway here is that it's often important to distinguish between errors in the data that you may want to communicate, from errors in the stream plumbing that take down the system. Don't jump to using OnError to communicate a bad piece of data - because this kills the stream. Instead, consider just sending an error value down the stream.

void Main()
{
var letters = new [] { "a", "b", "c", "d", "e", "f" };
// emit one letter every second
var letterStream = Observable.Interval(TimeSpan.FromSeconds(1)).Zip(letters, (_, ls) => ls);
// make this hot
var hotLetterStream = letterStream.Publish();
// set up a stream that subscribes to the hot
var failStream =
hotLetterStream.SelectMany(x => x == "c"
? Observable.Throw<string>(new Exception())
: Observable.Return(x));
var numberStream = Observable.Interval(TimeSpan.FromSeconds(1)).Take(4);
var combinedStream = numberStream.Zip(
failStream.Catch<string, Exception>(ex => Observable.Return("NO VALUE"))
.Repeat(), (ns,ls) => ns + " & " + ls);
combinedStream.Subscribe(Console.WriteLine);
hotLetterStream.Connect();
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment