Last active
October 16, 2015 22:49
-
-
Save mauroa/331248814d422db6670d to your computer and use it in GitHub Desktop.
Sample of how to make a Reactive and async Process in C#
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
* Sample 1: | |
var processInfo = new ProcessStartInfo (); | |
//TODO: Configure processInfo as desired | |
try { | |
//Waiting the process to complete and exit | |
await new Process().RunAsync(processInfo); | |
} catch(Exception ex) { | |
//This exception represents any exception and also the STDERR of the process (if redirect STDERR is configured) | |
} | |
* Sample 2: | |
var processInfo = new ProcessStartInfo (); | |
//TODO: Configure processInfo as desired | |
try { | |
var stdout = new Process().RunAsync(processInfo); | |
//Waiting the process to complete and exit | |
await stdout.Do(output => { | |
//Do things with each STDOUT line (if redirect STDOUT is configured) | |
}); | |
} catch(Exception ex) { | |
//This exception represents any exception and also the STDERR of the process (if redirect STDERR is configured) | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System.Collections.Generic; | |
using System.Linq; | |
using System.Reactive.Concurrency; | |
using System.Reactive.Disposables; | |
using System.Reactive.Linq; | |
using System.Reactive.Subjects; | |
namespace System.Diagnostics | |
{ | |
public static class ProcessExtensions | |
{ | |
public static IObservable<string> RunAsync(this Process process, ProcessStartInfo startInfo) | |
{ | |
if(process.IsRunning()) { | |
throw new ApplicationException ("The process is already running"); | |
} | |
if (!process.EnableRaisingEvents) { | |
process.EnableRaisingEvents = true; | |
} | |
process.StartInfo = startInfo; | |
process.Start(); | |
return Observable.Create<string> (observer => { | |
var disposables = new List<IDisposable>(); | |
if (!process.StartInfo.UseShellExecute) { | |
var outputObservable = default(IObservable<string>); | |
var errorObservable = default(IObservable<string>); | |
if (process.StartInfo.RedirectStandardOutput) { | |
outputObservable = Observable | |
.FromAsync<string> (() => { | |
return process.StandardOutput.ReadLineAsync (); | |
}) | |
.Repeat () | |
.TakeWhile(_ => !process.HasExited); | |
} | |
if(process.StartInfo.RedirectStandardError) { | |
errorObservable = Observable | |
.FromAsync<string> (() => { | |
return process.StandardError.ReadToEndAsync (); | |
}) | |
.Select (x => string.Format ("Error - {0}", x)); | |
} | |
var outputSubscription = Observable | |
.Merge (outputObservable ?? new Subject<string>(), errorObservable ?? new Subject<string>()) | |
.ObserveOn(NewThreadScheduler.Default) | |
.Subscribe (message => { | |
if (message.StartsWith ("Error -")) { | |
observer.OnError (new ApplicationException (message)); | |
} else if (!string.IsNullOrWhiteSpace (message)) { | |
observer.OnNext (message); | |
} | |
}, ex => { | |
observer.OnError (ex); | |
}, () => { | |
observer.OnCompleted (); | |
}); | |
disposables.Add (outputSubscription); | |
} | |
var exitSubscription = Observable | |
.FromEventPattern (h => process.Exited += h, h => process.Exited -= h) | |
.ObserveOn(NewThreadScheduler.Default) | |
.Subscribe (pattern => { | |
observer.OnNext("Exited"); | |
observer.OnCompleted (); | |
}, ex => { | |
observer.OnError (ex); | |
}, () => { | |
observer.OnNext("Exited"); | |
observer.OnCompleted (); | |
}); | |
disposables.Add (exitSubscription); | |
return new CompositeDisposable (disposables); | |
}); | |
} | |
public static bool IsRunning (this Process process) | |
{ | |
try { | |
Process.GetProcessById (process.Id); | |
return true; | |
} catch { | |
return false; | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment