ReactiveProcess
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/// <summary> | |
/// Uses character by character parsing for StandardOutput and StandardError, so \r can be processed. | |
/// Uses observables instead of eventhandlers. Do not call BeginReadStandardOutput and BeginReadStandardError etc | |
/// </summary> | |
public class ReactiveProcess : Process | |
{ | |
readonly CompositeDisposable _observables; | |
readonly Subject<string> _standardErrorObserable; | |
readonly Subject<string> _standardOutputObserable; | |
public ReactiveProcess() { | |
_standardOutputObserable = new Subject<string>(); | |
_standardErrorObserable = new Subject<string>(); | |
_observables = new CompositeDisposable {_standardOutputObserable, _standardErrorObserable}; | |
} | |
public IObservable<string> StandardOutputObservable { get { return _standardOutputObserable.AsObservable(); } } | |
public IObservable<string> StandardErrorObservable { get { return _standardErrorObserable.AsObservable(); } } | |
protected override void Dispose(bool disposing) { | |
base.Dispose(disposing); | |
if (disposing) | |
_observables.Dispose(); | |
} | |
/// <summary> | |
/// Validates StartInfo. Completes once the process has exited | |
/// </summary> | |
/// <returns></returns> | |
public Task StartAsync() { | |
this.Validate(); | |
Start(); | |
return Task.WhenAll(CreateAfterLaunchTasks()); | |
} | |
IEnumerable<Task> CreateAfterLaunchTasks() { | |
var tasks = new List<Task>(); | |
if (StartInfo.RedirectStandardOutput) | |
tasks.Add(Task.Run(() => ReadStreamToEnd(StandardOutput, _standardOutputObserable))); | |
if (StartInfo.RedirectStandardError) | |
tasks.Add(Task.Run(() => ReadStreamToEnd(StandardError, _standardErrorObserable))); | |
tasks.Add(Task.Run(() => WaitForExit())); | |
return tasks; | |
} | |
static async Task ReadStreamToEnd(StreamReader stream, IObserver<string> observable) { | |
try { | |
var readBuffer = new char[1]; | |
var lineBuffer = new StringBuilder(); | |
while ((await stream.ReadAsync(readBuffer, 0, 1).ConfigureAwait(false)) > 0) { | |
var c = readBuffer[0]; | |
lineBuffer.Append(c); | |
if (c != '\r' && c != '\n') | |
continue; | |
observable.OnNext(lineBuffer.ToString()); | |
lineBuffer.Clear(); | |
} | |
if (lineBuffer.Length > 0) | |
observable.OnNext(lineBuffer.ToString()); | |
} finally { | |
observable.OnCompleted(); | |
} | |
} | |
} | |
public static class ProcessValidationExtensions | |
{ | |
public static void Validate(this ProcessStartInfo startInfo) { | |
if ((startInfo.Verb == "runas") | |
&& !startInfo.UseShellExecute) | |
throw new NotSupportedException("Cannot use verb: runas, when shellExecute is disabled"); | |
if ((startInfo.RedirectStandardError || startInfo.RedirectStandardInput || startInfo.RedirectStandardOutput) | |
&& startInfo.UseShellExecute) | |
throw new NotSupportedException("Cannot use redirect, when shellExecute is enabled"); | |
} | |
public static void Validate(this Process process) { | |
process.StartInfo.Validate(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment