Skip to content

Instantly share code, notes, and snippets.

@mauroa
Last active October 16, 2015 22:49
Show Gist options
  • Save mauroa/331248814d422db6670d to your computer and use it in GitHub Desktop.
Save mauroa/331248814d422db6670d to your computer and use it in GitHub Desktop.
Sample of how to make a Reactive and async Process in C#
* Sample 1:
var processInfo = new ProcessStartInfo ();
//TODO: Configure processInfo as desired
try {
//Waiting the process to complete and exit
await new Process().RunAsync(processInfo);
} catch(Exception ex) {
//This exception represents any exception and also the STDERR of the process (if redirect STDERR is configured)
}
* Sample 2:
var processInfo = new ProcessStartInfo ();
//TODO: Configure processInfo as desired
try {
var stdout = new Process().RunAsync(processInfo);
//Waiting the process to complete and exit
await stdout.Do(output => {
//Do things with each STDOUT line (if redirect STDOUT is configured)
});
} catch(Exception ex) {
//This exception represents any exception and also the STDERR of the process (if redirect STDERR is configured)
}
using System.Collections.Generic;
using System.Linq;
using System.Reactive.Concurrency;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using System.Reactive.Subjects;
namespace System.Diagnostics
{
public static class ProcessExtensions
{
public static IObservable<string> RunAsync(this Process process, ProcessStartInfo startInfo)
{
if(process.IsRunning()) {
throw new ApplicationException ("The process is already running");
}
if (!process.EnableRaisingEvents) {
process.EnableRaisingEvents = true;
}
process.StartInfo = startInfo;
process.Start();
return Observable.Create<string> (observer => {
var disposables = new List<IDisposable>();
if (!process.StartInfo.UseShellExecute) {
var outputObservable = default(IObservable<string>);
var errorObservable = default(IObservable<string>);
if (process.StartInfo.RedirectStandardOutput) {
outputObservable = Observable
.FromAsync<string> (() => {
return process.StandardOutput.ReadLineAsync ();
})
.Repeat ()
.TakeWhile(_ => !process.HasExited);
}
if(process.StartInfo.RedirectStandardError) {
errorObservable = Observable
.FromAsync<string> (() => {
return process.StandardError.ReadToEndAsync ();
})
.Select (x => string.Format ("Error - {0}", x));
}
var outputSubscription = Observable
.Merge (outputObservable ?? new Subject<string>(), errorObservable ?? new Subject<string>())
.ObserveOn(NewThreadScheduler.Default)
.Subscribe (message => {
if (message.StartsWith ("Error -")) {
observer.OnError (new ApplicationException (message));
} else if (!string.IsNullOrWhiteSpace (message)) {
observer.OnNext (message);
}
}, ex => {
observer.OnError (ex);
}, () => {
observer.OnCompleted ();
});
disposables.Add (outputSubscription);
}
var exitSubscription = Observable
.FromEventPattern (h => process.Exited += h, h => process.Exited -= h)
.ObserveOn(NewThreadScheduler.Default)
.Subscribe (pattern => {
observer.OnNext("Exited");
observer.OnCompleted ();
}, ex => {
observer.OnError (ex);
}, () => {
observer.OnNext("Exited");
observer.OnCompleted ();
});
disposables.Add (exitSubscription);
return new CompositeDisposable (disposables);
});
}
public static bool IsRunning (this Process process)
{
try {
Process.GetProcessById (process.Id);
return true;
} catch {
return false;
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment