Skip to content

Instantly share code, notes, and snippets.

@mauroa
Last active May 10, 2016 20:20
Show Gist options
  • Save mauroa/ab3c54c6e710fed4dc39 to your computer and use it in GitHub Desktop.
Save mauroa/ab3c54c6e710fed4dc39 to your computer and use it in GitHub Desktop.
Async Process in C#
using System.Collections.Generic;
using System.Linq;
using System.Reactive.Concurrency;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Runtime.InteropServices;
using System.Threading;
using System.Threading.Tasks;
namespace System.Diagnostics
{
public static class Extensions
{
public static bool IsRunning (this Process process)
{
try {
Process.GetProcessById (process.Id);
return true;
} catch {
return false;
}
}
public static Process GetParentProcess (this Process process)
{
return GetParentProcess (process.Handle);
}
public static Process GetParentProcess (int processId)
{
var process = Process.GetProcessById (processId);
return process.GetParentProcess ();
}
public static IObservable<string> RunAsync (this Process process, ProcessStartInfo startInfo)
{
return process.RunAsync (startInfo, CancellationToken.None);
}
public static IObservable<string> RunAsync (this Process process, ProcessStartInfo startInfo, CancellationToken cancellationToken)
{
if (process.IsRunning ()) {
throw new Exception ("The process is already running");
}
if (startInfo.UseShellExecute) {
startInfo.RedirectStandardOutput = false;
startInfo.RedirectStandardError = false;
}
process.StartInfo = startInfo;
var cancellationRegistration = cancellationToken.Register (() => {
try {
process.Kill ();
} catch (InvalidOperationException) {
// If the process has already exited this could happen
}
});
cancellationToken.ThrowIfCancellationRequested ();
var processObservable = GetProcessObservable (process, cancellationRegistration);
process.Start ();
cancellationToken.ThrowIfCancellationRequested ();
return processObservable;
}
public static IObservable<string> ObserveAsync (this Process process)
{
return GetProcessObservable (process);
}
static IObservable<string> GetProcessObservable (Process process)
{
return GetProcessObservable (process, default (CancellationTokenRegistration));
}
static IObservable<string> GetProcessObservable (Process process, CancellationTokenRegistration cancellationRegistration)
{
return Observable.Create<string> (observer => {
var disposables = new List<IDisposable>();
var processSubscription = ObserveProcess (process, observer);
disposables.Add (processSubscription);
if (cancellationRegistration != default (CancellationTokenRegistration)) {
disposables.Add (cancellationRegistration);
}
disposables.Add (process);
return new CompositeDisposable (disposables);
});
}
static IDisposable ObserveProcess (Process process, IObserver<string> observer)
{
return Observable
.Merge (ObserveStdout (process), ObserveStderr (process), ObserveProcess (process))
.ObserveOn (NewThreadScheduler.Default)
.Subscribe (async message => {
if (string.IsNullOrWhiteSpace (message)) {
return;
}
if (message.StartsWith ("Error:")) {
var error = message.Substring (message.IndexOf (":") + 1);
observer.OnNext ("Process Error");
observer.OnError (new Exception (error));
} else {
observer.OnNext (message);
if(message == "Process Exited") {
//Wait just in case the stdout and stderr processing is not yet completed
await Task.Delay (1000).ConfigureAwait (continueOnCapturedContext: false);
observer.OnCompleted ();
}
}
}, ex => {
observer.OnError (ex);
});
}
static IObservable<string> ObserveStdout (Process process)
{
if (process.StartInfo.UseShellExecute || !process.StartInfo.RedirectStandardOutput) {
return new Subject<string> ();
}
return Observable
.FromAsync (() => {
return process.StandardOutput.ReadLineAsync ();
})
.Repeat ()
.TakeWhile (_ => process.IsRunning ());
}
static IObservable<string> ObserveStderr (Process process)
{
if (process.StartInfo.UseShellExecute || !process.StartInfo.RedirectStandardError) {
return new Subject<string> ();
}
return Observable
.FromAsync (() => {
return process.StandardError.ReadToEndAsync ();
})
.Where (stderr => !string.IsNullOrWhiteSpace (stderr))
.Select (stderr => string.Format ("Error:{0}", stderr));
}
static IObservable<string> ObserveProcess (Process process)
{
return Observable
.FromAsync (() => {
return Task.Run (() => {
process.WaitForExit ();
});
})
.Select (_ => "Process Exited");
}
static Process GetParentProcess (IntPtr handle)
{
var info = new ParentProcessInfo ();
var returnLength = default (int);
var status = ParentProcessInfo.NtQueryInformationProcess (handle, 0, ref info, Marshal.SizeOf (info), out returnLength);
if (status != 0) {
return null;
}
try {
return Process.GetProcessById (info.InheritedFromUniqueProcessId.ToInt32 ());
} catch (ArgumentException) {
// Process not found
return null;
}
}
}
[StructLayout (LayoutKind.Sequential)]
internal struct ParentProcessInfo
{
// These members must match PROCESS_BASIC_INFORMATION
internal IntPtr Reserved1;
internal IntPtr PebBaseAddress;
internal IntPtr Reserved2_0;
internal IntPtr Reserved2_1;
internal IntPtr UniqueProcessId;
internal IntPtr InheritedFromUniqueProcessId;
[DllImport ("ntdll.dll")]
internal static extern int NtQueryInformationProcess (IntPtr processHandle, int processInformationClass, ref ParentProcessInfo processInformation, int processInformationLength, out int returnLength);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment