Created
March 16, 2022 18:30
-
-
Save Stroniax/0b2d28b15b3570a6a837159c74b7d08a to your computer and use it in GitHub Desktop.
PowerShell job to subscribe a continuation action or script for when an initial job completes.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// PowerShell continuation job allows you to subscribe a continuation to an existing job. | |
// The following example is a bit useless but demonstrates the functionality of the job. | |
// PS:\> function Register-ContinuationJob { param([Job]$Job, [ScriptBlock]$Continuation) process { $cont = [ContinuationJob]::new($Job, $Continuation); $PSCmdlet.JobRepository.Add($cont); $cont } | |
// PS:\> $WakeUpJob = Start-Job { while (-not (Test-Connection Server01 -Quiet -Count 1)) { Start-Sleep -Seconds 1 } } | |
// PS:\> $ShutdownJob = Register-ContinuationJob -Job $WakeUpJob -Continuation { Invoke-Command -computerName Server01 -ScriptBlock { 'Sending shut down' ; Stop-Computer } } | |
// PS:\> $WakeUpJob, $ShutdownJob | Receive-Job -Wait -AutoRemoveJob | |
using System; | |
using System.Threading; | |
using System.Threading.Tasks; | |
using System.Management.Automation; | |
using System.Management.Automation.Runspaces; | |
#nullable enable | |
namespace PSSharp.Gists.ContinuationJob | |
{ | |
public class ContinuationJob : Job | |
{ | |
private static Func<Job, CancellationToken, ValueTask> CreateContinuationActionForScript(ScriptBlock continuation) => | |
async (current, cancellation) => | |
{ | |
using var rs = RunspaceFactory.CreateRunspace(InitialSessionState.CreateDefault2()); | |
rs.Open(); | |
using var ps = PowerShell.Create(rs); | |
ps.Streams.Debug = current.Debug; | |
ps.Streams.Warning = current.Warning; | |
ps.Streams.Error = current.Error; | |
ps.Streams.Verbose = current.Verbose; | |
ps.Streams.Progress = current.Progress; | |
var settings = new PSInvocationSettings() | |
{ | |
RemoteStreamOptions = 0, | |
}; | |
ValueTask stopping = ValueTask.CompletedTask; | |
await using var reg = cancellation.Register( | |
() => | |
{ | |
stopping = new(ps.StopAsync(null, null)); | |
}); | |
try | |
{ | |
await ps.AddScript("& $args[0]").AddArgument(continuation).InvokeAsync<PSObject, PSObject>(null, current.Output, settings, null, null); | |
} | |
finally | |
{ | |
reg.Unregister(); | |
await stopping; | |
} | |
}; | |
private static async void WaitContinuation(object? state, bool cancelled) | |
{ | |
var job = (ContinuationJob)state!; | |
try | |
{ | |
await job._continuation(job, job._cts.Token); | |
if (job.JobStateInfo.State == JobState.Stopping) | |
{ | |
job.SetJobState(JobState.Stopped); | |
} | |
else | |
{ | |
job.SetJobState(JobState.Completed); | |
} | |
} | |
catch (Exception ex) | |
{ | |
if (ex is OperationCanceledException && job._cts.IsCancellationRequested) | |
{ | |
job.SetJobState(JobState.Stopped); | |
} | |
else | |
{ | |
job.Error.Add( | |
new ErrorRecord( | |
ex, | |
"ContinuationException", | |
ErrorCategory.NotSpecified, | |
job | |
) | |
); | |
job.SetJobState(JobState.Failed); | |
} | |
} | |
} | |
public ContinuationJob(Job preceedingJob, Func<Job, CancellationToken, ValueTask> continuation) | |
{ | |
_preceedingJob = preceedingJob; | |
_continuation = continuation; | |
_continuationRegistration = ThreadPool.RegisterWaitForSingleObject(preceedingJob.Finished, WaitContinuation, this, -1, true); | |
_cts = new(); | |
_cts.Token.Register(() => _continuationRegistration.Unregister(_preceedingJob.Finished)); | |
} | |
public ContinuationJob(Job preceedingJob, ScriptBlock continuation) | |
: this(preceedingJob, CreateContinuationActionForScript(continuation)) | |
{ | |
} | |
private readonly RegisteredWaitHandle _continuationRegistration; | |
private readonly Func<Job, CancellationToken, ValueTask> _continuation; | |
private readonly CancellationTokenSource _cts; | |
private readonly Job _preceedingJob; | |
public override string Location => _preceedingJob.Location; | |
public override bool HasMoreData => false; | |
public override string StatusMessage => ""; | |
public override void StopJob() | |
{ | |
_cts.Cancel(); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment