Created
August 7, 2015 15:45
-
-
Save mauroa/5af7e64b28c85ddddb62 to your computer and use it in GitHub Desktop.
Observable Output
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Diagnostics; | |
using System.Reactive.Linq; | |
using System.Reactive.Concurrency; | |
using System.Reactive.Subjects; | |
using Xamarin.Messaging.Client; | |
namespace Xamarin.Messaging.AgentManager | |
{ | |
public class StartProcessResult : IStartProcessResult | |
{ | |
private bool disposed; | |
private IDisposable processSubscription; | |
private readonly Process process; | |
private readonly Subject<string> output; | |
public StartProcessResult (Process process) | |
{ | |
this.output = new Subject<string> (); | |
this.process = process; | |
this.ObserveProcess (process); | |
} | |
public IObservable<string> Output { get { return this.output; } } | |
private void ObserveProcess(Process process) | |
{ | |
var outputObservable = Observable.FromAsync<string> (() => { | |
return process.StandardOutput.ReadLineAsync (); | |
}) | |
.Repeat () | |
.TakeWhile(_ => !process.HasExited); | |
var errorObservable = Observable.FromAsync<string> (() => { | |
return process.StandardError.ReadToEndAsync (); | |
}) | |
.Select (x => string.Format ("Error - {0}", x)); | |
this.processSubscription = Observable.Merge (NewThreadScheduler.Default, outputObservable, errorObservable) | |
.Subscribe(message => { | |
if (message.StartsWith ("Error -")) { | |
this.output.OnError (new MessagingException (message)); | |
} else if (!string.IsNullOrWhiteSpace(message)) { | |
this.output.OnNext (message); | |
} | |
}, ex => { | |
this.output.OnError (ex); | |
}, () => { | |
this.output.OnCompleted (); | |
}); | |
} | |
public void Dispose () | |
{ | |
Dispose (true); | |
GC.SuppressFinalize (this); | |
} | |
protected virtual void Dispose (bool disposing) | |
{ | |
if (disposed) { | |
return; | |
} | |
this.processSubscription.Dispose (); | |
disposed = true; | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment