Skip to content

Instantly share code, notes, and snippets.

@mauroa
Created August 7, 2015 15:45
Show Gist options
  • Save mauroa/5af7e64b28c85ddddb62 to your computer and use it in GitHub Desktop.
Save mauroa/5af7e64b28c85ddddb62 to your computer and use it in GitHub Desktop.
Observable Output
using System;
using System.Diagnostics;
using System.Reactive.Linq;
using System.Reactive.Concurrency;
using System.Reactive.Subjects;
using Xamarin.Messaging.Client;
namespace Xamarin.Messaging.AgentManager
{
public class StartProcessResult : IStartProcessResult
{
private bool disposed;
private IDisposable processSubscription;
private readonly Process process;
private readonly Subject<string> output;
public StartProcessResult (Process process)
{
this.output = new Subject<string> ();
this.process = process;
this.ObserveProcess (process);
}
public IObservable<string> Output { get { return this.output; } }
private void ObserveProcess(Process process)
{
var outputObservable = Observable.FromAsync<string> (() => {
return process.StandardOutput.ReadLineAsync ();
})
.Repeat ()
.TakeWhile(_ => !process.HasExited);
var errorObservable = Observable.FromAsync<string> (() => {
return process.StandardError.ReadToEndAsync ();
})
.Select (x => string.Format ("Error - {0}", x));
this.processSubscription = Observable.Merge (NewThreadScheduler.Default, outputObservable, errorObservable)
.Subscribe(message => {
if (message.StartsWith ("Error -")) {
this.output.OnError (new MessagingException (message));
} else if (!string.IsNullOrWhiteSpace(message)) {
this.output.OnNext (message);
}
}, ex => {
this.output.OnError (ex);
}, () => {
this.output.OnCompleted ();
});
}
public void Dispose ()
{
Dispose (true);
GC.SuppressFinalize (this);
}
protected virtual void Dispose (bool disposing)
{
if (disposed) {
return;
}
this.processSubscription.Dispose ();
disposed = true;
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment