Last active
January 4, 2021 15:37
-
-
Save jnm2/a8a39b67a584ad555360102407049ae2 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System.Collections.Concurrent; | |
using System.Reflection; | |
using System.Threading; | |
using System.Xml.Linq; | |
public void RunSimultaneously(params Action[] actions) | |
{ | |
if (actions.Length == 0) return; | |
var bufferAccessAsyncLocal = default(AsyncLocal<TaskOutputBuffer>); | |
using (InterceptProcessRunner(Context, original => | |
{ | |
var alreadyInstalledBufferedProcessRunner = original as BufferedOutputProcessRunner; | |
if (alreadyInstalledBufferedProcessRunner != null) | |
{ | |
bufferAccessAsyncLocal = alreadyInstalledBufferedProcessRunner.BufferAccess; | |
return original; | |
} | |
else | |
{ | |
bufferAccessAsyncLocal = new AsyncLocal<TaskOutputBuffer>(); | |
return new BufferedOutputProcessRunner(original, bufferAccessAsyncLocal); | |
} | |
})) | |
{ | |
var originalConsole = default(IConsole); | |
using (InterceptConsole(Context, original => | |
{ | |
originalConsole = original; | |
var alreadyInstalledBufferedConsole = original as BufferedOutputConsole; | |
return alreadyInstalledBufferedConsole != null && alreadyInstalledBufferedConsole.BufferAccess == bufferAccessAsyncLocal ? original : | |
new BufferedOutputConsole(originalConsole, bufferAccessAsyncLocal); | |
})) | |
{ | |
var outputBuffers = new BlockingCollection<TaskOutputBuffer>(); | |
var actionOrderLock = new object(); | |
var nextActionIndex = 0; | |
var tasks = new System.Threading.Tasks.Task[actions.Length]; | |
for (var i = 0; i < tasks.Length; i++) | |
tasks[i] = System.Threading.Tasks.Task.Run(() => | |
{ | |
using (var outputBuffer = new TaskOutputBuffer()) | |
{ | |
// Ensure that outputBuffers.Add is called in the same order as actions, even if tasks start out of order | |
Action action; | |
lock (actionOrderLock) | |
{ | |
action = actions[nextActionIndex]; | |
nextActionIndex++; | |
outputBuffers.Add(outputBuffer); | |
} | |
bufferAccessAsyncLocal.Value = outputBuffer; | |
try | |
{ | |
action.Invoke(); | |
} | |
catch (Exception ex) | |
{ | |
Error(ex.Message); | |
throw; | |
} | |
} | |
}); | |
for (var i = 0; i < actions.Length; i++) | |
outputBuffers.Take().WriteOutputUntilCompleted(originalConsole); | |
System.Threading.Tasks.Task.WaitAll(tasks); // throw exceptions, if any | |
} | |
} | |
} | |
private sealed class TaskOutputBuffer : IDisposable | |
{ | |
private readonly BlockingCollection<Action<IConsole>> outputActions = new BlockingCollection<Action<IConsole>>(); | |
public void AppendAction(Action<IConsole> outputAction) | |
{ | |
outputActions.Add(outputAction); | |
} | |
public void Dispose() | |
{ | |
outputActions.CompleteAdding(); | |
} | |
public void WriteOutputUntilCompleted(IConsole console) | |
{ | |
Action<IConsole> action; | |
while (outputActions.TryTake(out action, Timeout.Infinite)) | |
action.Invoke(console); | |
} | |
} | |
private static Tuple<FieldInfo, T> FindSingleFieldWithValue<T>(object instance) | |
{ | |
var fields = instance.GetType().GetFields(BindingFlags.NonPublic | BindingFlags.Instance); | |
var found = (Tuple<FieldInfo, T>)null; | |
foreach (var field in fields) | |
{ | |
var fieldValue = field.GetValue(instance); | |
if (!(fieldValue is T)) continue; | |
if (found != null) return null; | |
found = Tuple.Create(field, (T)fieldValue); | |
} | |
return found; | |
} | |
private static IDisposable InterceptProcessRunner(ICakeContext context, Func<IProcessRunner, IProcessRunner> replacementProvider) | |
{ | |
var processRunnerField = FindSingleFieldWithValue<IProcessRunner>(context); | |
if (processRunnerField == null) | |
throw new InvalidOperationException(context + " does not have exactly one field containing an IProcessRunner instance."); | |
var fieldInfo = processRunnerField.Item1; | |
var originalValue = processRunnerField.Item2; | |
var replacement = replacementProvider.Invoke(originalValue); | |
if (replacement == originalValue) return null; | |
fieldInfo.SetValue(context, replacement); | |
return On.Dispose(() => fieldInfo.SetValue(context, originalValue)); | |
} | |
private static IDisposable InterceptConsole(ICakeContext context, Func<IConsole, IConsole> replacementProvider) | |
{ | |
var log = context.Log; | |
FieldInfo consoleFieldInfo; | |
IConsole originalConsole; | |
for (;;) | |
{ | |
var consoleField = FindSingleFieldWithValue<IConsole>(log); | |
if (consoleField != null) | |
{ | |
consoleFieldInfo = consoleField.Item1; | |
originalConsole = consoleField.Item2; | |
break; | |
} | |
var decoratedLogField = FindSingleFieldWithValue<ICakeLog>(log); | |
if (decoratedLogField == null) | |
throw new InvalidOperationException(log + " does not have exactly one field containing an IConsole instance or exactly one field containing an ICakeLog instance."); | |
log = decoratedLogField.Item2; | |
} | |
var replacement = replacementProvider.Invoke(originalConsole); | |
if (replacement == originalConsole) return null; | |
consoleFieldInfo.SetValue(log, replacement); | |
return On.Dispose(() => consoleFieldInfo.SetValue(log, originalConsole)); | |
} | |
private sealed class BufferedOutputConsole : IConsole | |
{ | |
private readonly IConsole internalConsole; | |
private readonly AsyncLocal<TaskOutputBuffer> bufferAccess; | |
internal AsyncLocal<TaskOutputBuffer> BufferAccess { get { return bufferAccess; } } | |
public BufferedOutputConsole(IConsole internalConsole, AsyncLocal<TaskOutputBuffer> bufferAccess) | |
{ | |
this.internalConsole = internalConsole; | |
this.bufferAccess = bufferAccess; | |
} | |
public void Write(string format, params object[] arg) | |
{ | |
var outputBuffer = bufferAccess.Value; | |
if (outputBuffer != null) | |
bufferAccess.Value.AppendAction(console => console.Write(format, arg)); | |
else | |
internalConsole.Write(format, arg); | |
} | |
public void WriteLine(string format, params object[] arg) | |
{ | |
var outputBuffer = bufferAccess.Value; | |
if (outputBuffer != null) | |
bufferAccess.Value.AppendAction(console => console.WriteLine(format, arg)); | |
else | |
internalConsole.WriteLine(format, arg); | |
} | |
public void WriteError(string format, params object[] arg) | |
{ | |
var outputBuffer = bufferAccess.Value; | |
if (outputBuffer != null) | |
bufferAccess.Value.AppendAction(console => console.WriteError(format, arg)); | |
else | |
internalConsole.WriteError(format, arg); | |
} | |
public void WriteErrorLine(string format, params object[] arg) | |
{ | |
var outputBuffer = bufferAccess.Value; | |
if (outputBuffer != null) | |
bufferAccess.Value.AppendAction(console => console.WriteErrorLine(format, arg)); | |
else | |
internalConsole.WriteErrorLine(format, arg); | |
} | |
public ConsoleColor ForegroundColor | |
{ | |
get | |
{ | |
throw new NotImplementedException(); | |
} | |
set | |
{ | |
var outputBuffer = bufferAccess.Value; | |
if (outputBuffer != null) | |
bufferAccess.Value.AppendAction(console => console.ForegroundColor = value); | |
else | |
internalConsole.ForegroundColor = value; | |
} | |
} | |
public ConsoleColor BackgroundColor | |
{ | |
get | |
{ | |
throw new NotImplementedException(); | |
} | |
set | |
{ | |
var outputBuffer = bufferAccess.Value; | |
if (outputBuffer != null) | |
bufferAccess.Value.AppendAction(console => console.BackgroundColor = value); | |
else | |
internalConsole.BackgroundColor = value; | |
} | |
} | |
public void ResetColor() | |
{ | |
var outputBuffer = bufferAccess.Value; | |
if (outputBuffer != null) | |
bufferAccess.Value.AppendAction(console => console.ResetColor()); | |
else | |
internalConsole.ResetColor(); | |
} | |
} | |
private sealed class BufferedOutputProcessRunner : IProcessRunner | |
{ | |
private readonly IProcessRunner internalRunner; | |
private readonly AsyncLocal<TaskOutputBuffer> bufferAccess; | |
internal AsyncLocal<TaskOutputBuffer> BufferAccess { get { return bufferAccess; } } | |
public BufferedOutputProcessRunner(IProcessRunner internalRunner, AsyncLocal<TaskOutputBuffer> bufferAccess) | |
{ | |
this.internalRunner = internalRunner; | |
this.bufferAccess = bufferAccess; | |
} | |
public IProcess Start(FilePath filePath, ProcessSettings settings) | |
{ | |
var outputBuffer = bufferAccess.Value; | |
if (outputBuffer == null) return internalRunner.Start(filePath, settings); | |
settings.RedirectStandardOutput = true; | |
var wrapper = internalRunner.Start(filePath, settings); | |
outputBuffer.AppendAction(console => | |
{ | |
foreach (var line in wrapper.GetStandardOutput()) | |
console.WriteLine(line.Replace("{", "{{").Replace("}", "}}")); | |
}); | |
return wrapper; | |
} | |
} | |
public static class On | |
{ | |
public static IDisposable Dispose(Action action) | |
{ | |
return new OnDisposeAction(action); | |
} | |
private sealed class OnDisposeAction : IDisposable | |
{ | |
private Action action; | |
public OnDisposeAction(Action action) | |
{ | |
this.action = action; | |
} | |
public void Dispose() | |
{ | |
var action = System.Threading.Interlocked.Exchange(ref this.action, null); | |
if (action != null) action.Invoke(); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Hey, have you ever seen this kind of error using this gist?
2020-12-07T22:07:59.9572751Z Error: System.AggregateException: One or more errors occurred. ---> System.InvalidOperationException: No process is associated with this object.
2020-12-07T22:07:59.9573390Z at System.Diagnostics.Process.EnsureState(State state)
2020-12-07T22:07:59.9573751Z at System.Diagnostics.Process.get_HasExited()
2020-12-07T22:07:59.9574150Z at Cake.Core.IO.ProcessWrapper.d__15.MoveNext()
2020-12-07T22:07:59.9574908Z at Submission#0.BufferedOutputProcessRunner.<>c__DisplayClass5_0.b__0(IConsole console) in C:\vsts-agent-1_work\8\s\core\build\run-simultaneously.cake:line 290
2020-12-07T22:07:59.9575971Z at Submission#0.TaskOutputBuffer.WriteOutputUntilCompleted(IConsole console) in C:\vsts-agent-1_work\8\s\core\build\run-simultaneously.cake:line 99
2020-12-07T22:07:59.9576856Z at Submission#0.RunSimultaneously(Action[] actions) in C:\vsts-agent-1_work\8\s\core\build\run-simultaneously.cake:line 73
2020-12-07T22:07:59.9577663Z at Submission#0.<>b__0_9() in C:\vsts-agent-1_work\8\s\core\build\azure-pipelines-pr-unit.cake:line 35
2020-12-07T22:07:59.9578262Z at Cake.Core.CakeTaskBuilderExtensions.<>c__DisplayClass32_0.b__0(ICakeContext x)
2020-12-07T22:07:59.9578610Z at Cake.Core.CakeTask.d__43.MoveNext()