Skip to content

Instantly share code, notes, and snippets.

@jnm2
Last active January 4, 2021 15:37
Show Gist options
  • Save jnm2/a8a39b67a584ad555360102407049ae2 to your computer and use it in GitHub Desktop.
Save jnm2/a8a39b67a584ad555360102407049ae2 to your computer and use it in GitHub Desktop.
using System.Collections.Concurrent;
using System.Reflection;
using System.Threading;
using System.Xml.Linq;
public void RunSimultaneously(params Action[] actions)
{
if (actions.Length == 0) return;
var bufferAccessAsyncLocal = default(AsyncLocal<TaskOutputBuffer>);
using (InterceptProcessRunner(Context, original =>
{
var alreadyInstalledBufferedProcessRunner = original as BufferedOutputProcessRunner;
if (alreadyInstalledBufferedProcessRunner != null)
{
bufferAccessAsyncLocal = alreadyInstalledBufferedProcessRunner.BufferAccess;
return original;
}
else
{
bufferAccessAsyncLocal = new AsyncLocal<TaskOutputBuffer>();
return new BufferedOutputProcessRunner(original, bufferAccessAsyncLocal);
}
}))
{
var originalConsole = default(IConsole);
using (InterceptConsole(Context, original =>
{
originalConsole = original;
var alreadyInstalledBufferedConsole = original as BufferedOutputConsole;
return alreadyInstalledBufferedConsole != null && alreadyInstalledBufferedConsole.BufferAccess == bufferAccessAsyncLocal ? original :
new BufferedOutputConsole(originalConsole, bufferAccessAsyncLocal);
}))
{
var outputBuffers = new BlockingCollection<TaskOutputBuffer>();
var actionOrderLock = new object();
var nextActionIndex = 0;
var tasks = new System.Threading.Tasks.Task[actions.Length];
for (var i = 0; i < tasks.Length; i++)
tasks[i] = System.Threading.Tasks.Task.Run(() =>
{
using (var outputBuffer = new TaskOutputBuffer())
{
// Ensure that outputBuffers.Add is called in the same order as actions, even if tasks start out of order
Action action;
lock (actionOrderLock)
{
action = actions[nextActionIndex];
nextActionIndex++;
outputBuffers.Add(outputBuffer);
}
bufferAccessAsyncLocal.Value = outputBuffer;
try
{
action.Invoke();
}
catch (Exception ex)
{
Error(ex.Message);
throw;
}
}
});
for (var i = 0; i < actions.Length; i++)
outputBuffers.Take().WriteOutputUntilCompleted(originalConsole);
System.Threading.Tasks.Task.WaitAll(tasks); // throw exceptions, if any
}
}
}
private sealed class TaskOutputBuffer : IDisposable
{
private readonly BlockingCollection<Action<IConsole>> outputActions = new BlockingCollection<Action<IConsole>>();
public void AppendAction(Action<IConsole> outputAction)
{
outputActions.Add(outputAction);
}
public void Dispose()
{
outputActions.CompleteAdding();
}
public void WriteOutputUntilCompleted(IConsole console)
{
Action<IConsole> action;
while (outputActions.TryTake(out action, Timeout.Infinite))
action.Invoke(console);
}
}
private static Tuple<FieldInfo, T> FindSingleFieldWithValue<T>(object instance)
{
var fields = instance.GetType().GetFields(BindingFlags.NonPublic | BindingFlags.Instance);
var found = (Tuple<FieldInfo, T>)null;
foreach (var field in fields)
{
var fieldValue = field.GetValue(instance);
if (!(fieldValue is T)) continue;
if (found != null) return null;
found = Tuple.Create(field, (T)fieldValue);
}
return found;
}
private static IDisposable InterceptProcessRunner(ICakeContext context, Func<IProcessRunner, IProcessRunner> replacementProvider)
{
var processRunnerField = FindSingleFieldWithValue<IProcessRunner>(context);
if (processRunnerField == null)
throw new InvalidOperationException(context + " does not have exactly one field containing an IProcessRunner instance.");
var fieldInfo = processRunnerField.Item1;
var originalValue = processRunnerField.Item2;
var replacement = replacementProvider.Invoke(originalValue);
if (replacement == originalValue) return null;
fieldInfo.SetValue(context, replacement);
return On.Dispose(() => fieldInfo.SetValue(context, originalValue));
}
private static IDisposable InterceptConsole(ICakeContext context, Func<IConsole, IConsole> replacementProvider)
{
var log = context.Log;
FieldInfo consoleFieldInfo;
IConsole originalConsole;
for (;;)
{
var consoleField = FindSingleFieldWithValue<IConsole>(log);
if (consoleField != null)
{
consoleFieldInfo = consoleField.Item1;
originalConsole = consoleField.Item2;
break;
}
var decoratedLogField = FindSingleFieldWithValue<ICakeLog>(log);
if (decoratedLogField == null)
throw new InvalidOperationException(log + " does not have exactly one field containing an IConsole instance or exactly one field containing an ICakeLog instance.");
log = decoratedLogField.Item2;
}
var replacement = replacementProvider.Invoke(originalConsole);
if (replacement == originalConsole) return null;
consoleFieldInfo.SetValue(log, replacement);
return On.Dispose(() => consoleFieldInfo.SetValue(log, originalConsole));
}
private sealed class BufferedOutputConsole : IConsole
{
private readonly IConsole internalConsole;
private readonly AsyncLocal<TaskOutputBuffer> bufferAccess;
internal AsyncLocal<TaskOutputBuffer> BufferAccess { get { return bufferAccess; } }
public BufferedOutputConsole(IConsole internalConsole, AsyncLocal<TaskOutputBuffer> bufferAccess)
{
this.internalConsole = internalConsole;
this.bufferAccess = bufferAccess;
}
public void Write(string format, params object[] arg)
{
var outputBuffer = bufferAccess.Value;
if (outputBuffer != null)
bufferAccess.Value.AppendAction(console => console.Write(format, arg));
else
internalConsole.Write(format, arg);
}
public void WriteLine(string format, params object[] arg)
{
var outputBuffer = bufferAccess.Value;
if (outputBuffer != null)
bufferAccess.Value.AppendAction(console => console.WriteLine(format, arg));
else
internalConsole.WriteLine(format, arg);
}
public void WriteError(string format, params object[] arg)
{
var outputBuffer = bufferAccess.Value;
if (outputBuffer != null)
bufferAccess.Value.AppendAction(console => console.WriteError(format, arg));
else
internalConsole.WriteError(format, arg);
}
public void WriteErrorLine(string format, params object[] arg)
{
var outputBuffer = bufferAccess.Value;
if (outputBuffer != null)
bufferAccess.Value.AppendAction(console => console.WriteErrorLine(format, arg));
else
internalConsole.WriteErrorLine(format, arg);
}
public ConsoleColor ForegroundColor
{
get
{
throw new NotImplementedException();
}
set
{
var outputBuffer = bufferAccess.Value;
if (outputBuffer != null)
bufferAccess.Value.AppendAction(console => console.ForegroundColor = value);
else
internalConsole.ForegroundColor = value;
}
}
public ConsoleColor BackgroundColor
{
get
{
throw new NotImplementedException();
}
set
{
var outputBuffer = bufferAccess.Value;
if (outputBuffer != null)
bufferAccess.Value.AppendAction(console => console.BackgroundColor = value);
else
internalConsole.BackgroundColor = value;
}
}
public void ResetColor()
{
var outputBuffer = bufferAccess.Value;
if (outputBuffer != null)
bufferAccess.Value.AppendAction(console => console.ResetColor());
else
internalConsole.ResetColor();
}
}
private sealed class BufferedOutputProcessRunner : IProcessRunner
{
private readonly IProcessRunner internalRunner;
private readonly AsyncLocal<TaskOutputBuffer> bufferAccess;
internal AsyncLocal<TaskOutputBuffer> BufferAccess { get { return bufferAccess; } }
public BufferedOutputProcessRunner(IProcessRunner internalRunner, AsyncLocal<TaskOutputBuffer> bufferAccess)
{
this.internalRunner = internalRunner;
this.bufferAccess = bufferAccess;
}
public IProcess Start(FilePath filePath, ProcessSettings settings)
{
var outputBuffer = bufferAccess.Value;
if (outputBuffer == null) return internalRunner.Start(filePath, settings);
settings.RedirectStandardOutput = true;
var wrapper = internalRunner.Start(filePath, settings);
outputBuffer.AppendAction(console =>
{
foreach (var line in wrapper.GetStandardOutput())
console.WriteLine(line.Replace("{", "{{").Replace("}", "}}"));
});
return wrapper;
}
}
public static class On
{
public static IDisposable Dispose(Action action)
{
return new OnDisposeAction(action);
}
private sealed class OnDisposeAction : IDisposable
{
private Action action;
public OnDisposeAction(Action action)
{
this.action = action;
}
public void Dispose()
{
var action = System.Threading.Interlocked.Exchange(ref this.action, null);
if (action != null) action.Invoke();
}
}
}
@pitermarx
Copy link

AWESOME WORK! Could you make this a cake addin?

@jnm2
Copy link
Author

jnm2 commented Apr 28, 2017

@pitermarx I was reluctant, but I've just had to update it yet again for compatibility with https://github.com/agc93/Cake.BuildSystems.Module TFS integration. I am strongly considering it ASAP, when my other commitments are met.

@dennisroche
Copy link

Until it is available as a nuget package, you can include using http://cakebuild.net/docs/fundamentals/preprocessor-directives

#load "local:?path=tools/RunSimultaneously.cake"

@richardgavel
Copy link

Hey, have you ever seen this kind of error using this gist?

2020-12-07T22:07:59.9572751Z Error: System.AggregateException: One or more errors occurred. ---> System.InvalidOperationException: No process is associated with this object.
2020-12-07T22:07:59.9573390Z at System.Diagnostics.Process.EnsureState(State state)
2020-12-07T22:07:59.9573751Z at System.Diagnostics.Process.get_HasExited()
2020-12-07T22:07:59.9574150Z at Cake.Core.IO.ProcessWrapper.d__15.MoveNext()
2020-12-07T22:07:59.9574908Z at Submission#0.BufferedOutputProcessRunner.<>c__DisplayClass5_0.b__0(IConsole console) in C:\vsts-agent-1_work\8\s\core\build\run-simultaneously.cake:line 290
2020-12-07T22:07:59.9575971Z at Submission#0.TaskOutputBuffer.WriteOutputUntilCompleted(IConsole console) in C:\vsts-agent-1_work\8\s\core\build\run-simultaneously.cake:line 99
2020-12-07T22:07:59.9576856Z at Submission#0.RunSimultaneously(Action[] actions) in C:\vsts-agent-1_work\8\s\core\build\run-simultaneously.cake:line 73
2020-12-07T22:07:59.9577663Z at Submission#0.<>b__0_9() in C:\vsts-agent-1_work\8\s\core\build\azure-pipelines-pr-unit.cake:line 35
2020-12-07T22:07:59.9578262Z at Cake.Core.CakeTaskBuilderExtensions.<>c__DisplayClass32_0.b__0(ICakeContext x)
2020-12-07T22:07:59.9578610Z at Cake.Core.CakeTask.d__43.MoveNext()

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment