Skip to content

Instantly share code, notes, and snippets.

@patroza
Created February 1, 2015 12:16
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
Star You must be signed in to star a gist
Save patroza/f58e682c5bac47ff5c6f to your computer and use it in GitHub Desktop.
ReactiveProcess
/// <summary>
/// Uses character by character parsing for StandardOutput and StandardError, so \r can be processed.
/// Uses observables instead of eventhandlers. Do not call BeginReadStandardOutput and BeginReadStandardError etc
/// </summary>
public class ReactiveProcess : Process
{
readonly CompositeDisposable _observables;
readonly Subject<string> _standardErrorObserable;
readonly Subject<string> _standardOutputObserable;
public ReactiveProcess() {
_standardOutputObserable = new Subject<string>();
_standardErrorObserable = new Subject<string>();
_observables = new CompositeDisposable {_standardOutputObserable, _standardErrorObserable};
}
public IObservable<string> StandardOutputObservable { get { return _standardOutputObserable.AsObservable(); } }
public IObservable<string> StandardErrorObservable { get { return _standardErrorObserable.AsObservable(); } }
protected override void Dispose(bool disposing) {
base.Dispose(disposing);
if (disposing)
_observables.Dispose();
}
/// <summary>
/// Validates StartInfo. Completes once the process has exited
/// </summary>
/// <returns></returns>
public Task StartAsync() {
this.Validate();
Start();
return Task.WhenAll(CreateAfterLaunchTasks());
}
IEnumerable<Task> CreateAfterLaunchTasks() {
var tasks = new List<Task>();
if (StartInfo.RedirectStandardOutput)
tasks.Add(Task.Run(() => ReadStreamToEnd(StandardOutput, _standardOutputObserable)));
if (StartInfo.RedirectStandardError)
tasks.Add(Task.Run(() => ReadStreamToEnd(StandardError, _standardErrorObserable)));
tasks.Add(Task.Run(() => WaitForExit()));
return tasks;
}
static async Task ReadStreamToEnd(StreamReader stream, IObserver<string> observable) {
try {
var readBuffer = new char[1];
var lineBuffer = new StringBuilder();
while ((await stream.ReadAsync(readBuffer, 0, 1).ConfigureAwait(false)) > 0) {
var c = readBuffer[0];
lineBuffer.Append(c);
if (c != '\r' && c != '\n')
continue;
observable.OnNext(lineBuffer.ToString());
lineBuffer.Clear();
}
if (lineBuffer.Length > 0)
observable.OnNext(lineBuffer.ToString());
} finally {
observable.OnCompleted();
}
}
}
public static class ProcessValidationExtensions
{
public static void Validate(this ProcessStartInfo startInfo) {
if ((startInfo.Verb == "runas")
&& !startInfo.UseShellExecute)
throw new NotSupportedException("Cannot use verb: runas, when shellExecute is disabled");
if ((startInfo.RedirectStandardError || startInfo.RedirectStandardInput || startInfo.RedirectStandardOutput)
&& startInfo.UseShellExecute)
throw new NotSupportedException("Cannot use redirect, when shellExecute is enabled");
}
public static void Validate(this Process process) {
process.StartInfo.Validate();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment