Skip to content

Instantly share code, notes, and snippets.

@Stroniax
Last active October 9, 2023 18:34
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 2 You must be signed in to fork a gist
  • Save Stroniax/a45bfa209c061898d99fc322c594ddf3 to your computer and use it in GitHub Desktop.
Save Stroniax/a45bfa209c061898d99fc322c594ddf3 to your computer and use it in GitHub Desktop.
A PowerShell Job wrapper for System.Threading.Tasks.Task to bridge the gap between asynchronous operations in C# and PowerShell.
using System;
using System.Collections.Generic;
using System.Linq;
using System.Management.Automation;
using System.Threading;
using System.Threading.Tasks;
namespace Casion.PowerShell
{
/// <summary>
/// <para type='description'>A PowerShell Job that represents one or more asynchronous <see cref="Task"/> instances
/// that belong to a logical group (such as multiple tasks created by a single cmdlet invocation).
/// This allows standard PowerShell job operations such as Wait-Job and Receive-Job to be used with the <see cref="Task"/> class.</para>
/// </summary>
public sealed class TaskJob : Job
{
private volatile bool _hasFailed, _hasStopped;
private readonly int _childJobsCount;
private volatile int _childJobsFinishedCount;
private readonly object _syncObject = new object();
/// <summary>
/// Returns <see langword="true"/> if this job has unread data in <see cref="Job.Output"/> or <see cref="Job.Error"/>.
/// </summary>
public override bool HasMoreData => ChildJobs.Any(j => j.HasMoreData);
/// <summary>
/// The host executing this job.
/// </summary>
public override string Location => Environment.MachineName;
/// <summary>
/// The status of the child jobs.
/// </summary>
public override string StatusMessage => string.Join(", ", ChildJobs.Select(j => j.StatusMessage));
/// <summary>
/// Determines if the <see cref="CancellationTokenSource"/> used by the child jobs should be disposed when the child jobs are disposed. The default value is <see langword="true"/>.
/// </summary>
public bool DisposeCancellationTokenSourceOnDisposed
{
set
{
foreach (var job in ChildJobs)
{
if (job is TaskChildJob tjob)
{
tjob.DisposeCancellationTokenSourceOnDisposed = value;
}
}
}
}
/// <summary>
/// Check if all child jobs are finished; if so, mark this job as finished.
/// </summary>
private void OnChildJobCompleted(object? sender, EventArgs? arguments)
{
if (sender is Job job)
{
_hasFailed = _hasFailed || job.JobStateInfo.State == JobState.Failed;
_hasStopped = _hasStopped || job.JobStateInfo.State == JobState.Stopped;
}
lock (_syncObject)
{
if (++_childJobsFinishedCount >= _childJobsCount)
{
SetJobState(
_hasFailed ? JobState.Failed :
_hasStopped ? JobState.Stopped :
JobState.Completed);
}
}
}
/// <summary>
/// Stops child jobs.
/// </summary>
public override void StopJob()
{
foreach (var job in ChildJobs)
{
job.StopJob();
}
}
/// <summary>
/// Indicates whether any of the tasks passed to this task are expected to write an object to <see cref="Job.Output"/>.
/// </summary>
public bool ExpectingOutput => ChildJobs.Any(c => c is TaskChildJob tjob && tjob.ExpectingOutput);
private TaskJob(string? name, string? command, IEnumerable<(Task, CancellationTokenSource?)> tasks)
: base(command, name)
{
if (tasks.All(t => t.Item1 is null) || tasks.Count() == 0)
throw new ArgumentException($"One or more {nameof(Task)} values must be provided.");
PSJobTypeName = nameof(TaskJob);
SetJobState(JobState.Running);
lock (_syncObject)
{
foreach (var pair in tasks ?? throw new ArgumentNullException(nameof(tasks)))
{
if (pair.Item1 is null) continue;
var childJob = new TaskChildJob(task: pair.Item1,
cancellationTokenSource: pair.Item2,
command: command);
ChildJobs.Add(childJob);
childJob.StateChanged += OnChildJobCompleted;
}
_childJobsCount = ChildJobs.Count;
}
}
public static TaskJob StartJob(string? name, string? command, IEnumerable<(Task, CancellationTokenSource?)> tasks)
{
return new TaskJob(name, command, tasks);
}
public static TaskJob StartJob(string? name, string? command, IEnumerable<Task> tasks, CancellationTokenSource? cancellationTokenSource)
=> StartJob(name, command, tasks.Select(t => (t, cancellationTokenSource)));
public static TaskJob StartJob(string? name, string? command, Task task, CancellationTokenSource? cancellationTokenSource)
=> StartJob(name, command, new[] { task }, cancellationTokenSource);
/// <summary>
/// A PowerShell job representing a single <see cref="Task"/>.
/// </summary>
private sealed class TaskChildJob : Job
{
internal event EventHandler? TaskFinished;
private readonly Task _task;
private readonly CancellationTokenSource? _cts;
public override bool HasMoreData => Error.Count > 0 || Output.Count > 0;
public override string Location => Environment.MachineName;
public override string StatusMessage => _task.Status.ToString();
// task.Result type is not VoidTaskResult. If Task.Result doesn't exist, returns false.
public bool ExpectingOutput => !_task.GetType().GetProperty("Result")?.GetValue(_task)?.GetType()?.Name?.Equals("VoidTaskResult") ?? false;
/// <summary>
/// Indicates if the <see cref="CancellationTokenSource"/> passed to this job's constructor (if any) should be disposed when the job is disposed.
/// Defaults to <see langword="true"/>.
/// </summary>
public bool DisposeCancellationTokenSourceOnDisposed
{
set
{
_disposeCTS = value;
}
}
private bool _disposeCTS = true;
/// <summary>
/// If a <see cref="CancellationTokenSource"/> was passed to this job's constructor, it will be cancelled.
/// Otherwise, the job state will just be set to <see cref="JobState.Stopped"/>.
/// </summary>
public override void StopJob()
{
SetJobState(JobState.Stopping);
// to prevent the job from hanging, we'll say the job is stopped
// if we can't stop it. Otherwise, we'll cancel _cts and let the
// .ContinueWith() invocation set the job's state.
if (_cts is null)
{
SetJobState(JobState.Stopped);
TaskFinished?.Invoke(this, new EventArgs());
}
else
{
_cts.Cancel();
}
}
protected override void Dispose(bool disposing)
{
if (disposing)
{
_task.Dispose();
if (_disposeCTS)
{
_cts?.Dispose();
}
}
base.Dispose(disposing);
}
internal TaskChildJob(Task task, CancellationTokenSource? cancellationTokenSource = null, string? command = null)
: base(command)
{
PSJobTypeName = nameof(TaskChildJob);
SetJobState(JobState.Running);
_task = task ?? throw new ArgumentNullException(nameof(task));
_task.ContinueWith(OnTaskCompleted);
_cts = cancellationTokenSource;
}
/// <summary>
/// Executed when the <see cref="Task"/> this job represents has completed.
/// </summary>
/// <param name="task"><see cref="_task"/> after completing.</param>
private async Task OnTaskCompleted(Task task)
{
// 1. Is there any reason for this method to return Task?
// 2. Is there any reason to say _task = task; _task.ContinueWith() instead of just _task = task.ContinueWith()?
// When this method does not return null that returns Task<Task>. But ... does that matter?
if (task.IsCanceled)
{
SetJobState(JobState.Stopped);
TaskFinished?.Invoke(this, new EventArgs());
}
else
{
try
{
if (ExpectingOutput)
{
var invokeResult = await (dynamic)task;
Output.Add(PSObject.AsPSObject(invokeResult));
}
else
{
await task;
}
SetJobState(JobState.Completed);
TaskFinished?.Invoke(this, new EventArgs());
}
catch (Exception haltingException)
{
Error.Add(new ErrorRecord(
haltingException,
"TaskException",
ErrorCategory.NotSpecified,
task)
{
ErrorDetails = new ErrorDetails($"An exception occurred in the task. {haltingException.Message}"),
});
SetJobState(JobState.Failed);
TaskFinished?.Invoke(this, new EventArgs());
}
}
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment