Skip to content

Instantly share code, notes, and snippets.

@Stroniax
Created March 16, 2022 18:30
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Stroniax/0b2d28b15b3570a6a837159c74b7d08a to your computer and use it in GitHub Desktop.
Save Stroniax/0b2d28b15b3570a6a837159c74b7d08a to your computer and use it in GitHub Desktop.
PowerShell job to subscribe a continuation action or script for when an initial job completes.
// PowerShell continuation job allows you to subscribe a continuation to an existing job.
// The following example is a bit useless but demonstrates the functionality of the job.
// PS:\> function Register-ContinuationJob { param([Job]$Job, [ScriptBlock]$Continuation) process { $cont = [ContinuationJob]::new($Job, $Continuation); $PSCmdlet.JobRepository.Add($cont); $cont }
// PS:\> $WakeUpJob = Start-Job { while (-not (Test-Connection Server01 -Quiet -Count 1)) { Start-Sleep -Seconds 1 } }
// PS:\> $ShutdownJob = Register-ContinuationJob -Job $WakeUpJob -Continuation { Invoke-Command -computerName Server01 -ScriptBlock { 'Sending shut down' ; Stop-Computer } }
// PS:\> $WakeUpJob, $ShutdownJob | Receive-Job -Wait -AutoRemoveJob
using System;
using System.Threading;
using System.Threading.Tasks;
using System.Management.Automation;
using System.Management.Automation.Runspaces;
#nullable enable
namespace PSSharp.Gists.ContinuationJob
{
public class ContinuationJob : Job
{
private static Func<Job, CancellationToken, ValueTask> CreateContinuationActionForScript(ScriptBlock continuation) =>
async (current, cancellation) =>
{
using var rs = RunspaceFactory.CreateRunspace(InitialSessionState.CreateDefault2());
rs.Open();
using var ps = PowerShell.Create(rs);
ps.Streams.Debug = current.Debug;
ps.Streams.Warning = current.Warning;
ps.Streams.Error = current.Error;
ps.Streams.Verbose = current.Verbose;
ps.Streams.Progress = current.Progress;
var settings = new PSInvocationSettings()
{
RemoteStreamOptions = 0,
};
ValueTask stopping = ValueTask.CompletedTask;
await using var reg = cancellation.Register(
() =>
{
stopping = new(ps.StopAsync(null, null));
});
try
{
await ps.AddScript("& $args[0]").AddArgument(continuation).InvokeAsync<PSObject, PSObject>(null, current.Output, settings, null, null);
}
finally
{
reg.Unregister();
await stopping;
}
};
private static async void WaitContinuation(object? state, bool cancelled)
{
var job = (ContinuationJob)state!;
try
{
await job._continuation(job, job._cts.Token);
if (job.JobStateInfo.State == JobState.Stopping)
{
job.SetJobState(JobState.Stopped);
}
else
{
job.SetJobState(JobState.Completed);
}
}
catch (Exception ex)
{
if (ex is OperationCanceledException && job._cts.IsCancellationRequested)
{
job.SetJobState(JobState.Stopped);
}
else
{
job.Error.Add(
new ErrorRecord(
ex,
"ContinuationException",
ErrorCategory.NotSpecified,
job
)
);
job.SetJobState(JobState.Failed);
}
}
}
public ContinuationJob(Job preceedingJob, Func<Job, CancellationToken, ValueTask> continuation)
{
_preceedingJob = preceedingJob;
_continuation = continuation;
_continuationRegistration = ThreadPool.RegisterWaitForSingleObject(preceedingJob.Finished, WaitContinuation, this, -1, true);
_cts = new();
_cts.Token.Register(() => _continuationRegistration.Unregister(_preceedingJob.Finished));
}
public ContinuationJob(Job preceedingJob, ScriptBlock continuation)
: this(preceedingJob, CreateContinuationActionForScript(continuation))
{
}
private readonly RegisteredWaitHandle _continuationRegistration;
private readonly Func<Job, CancellationToken, ValueTask> _continuation;
private readonly CancellationTokenSource _cts;
private readonly Job _preceedingJob;
public override string Location => _preceedingJob.Location;
public override bool HasMoreData => false;
public override string StatusMessage => "";
public override void StopJob()
{
_cts.Cancel();
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment