Last active
January 14, 2023 06:33
-
-
Save scriptingstudio/a1ce247fd1d6a75996f98ed9f578c10a to your computer and use it in GitHub Desktop.
Foreach-Parallel
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<# | |
.SYNOPSIS | |
Performs parallel processing of input objects. | |
.DESCRIPTION | |
Simple PowerShell function to asynchronously run jobs. | |
Yet another parallel processing manager on Earth. | |
FEATURES: | |
- no modules, just one small self-sufficient function | |
- automatic scriptblock variables: $_, $psitem | |
- automatic scriptblock parameters | |
- $taskObject - contains the current task object which provides its auto timing | |
- $inputObject - contains the current pipe object | |
- runtime controls | |
- granular timing: item timer, batch timer | |
- task auto timing by $taskObject - item timeout control | |
- delay before finishing starts: useful on big collections | |
- multiple scripts | |
- import of external variables, functions, modules | |
- import of function files | |
- support of "using:" scope modifier | |
- runspace options | |
- asynchronous job finishing | |
- smart ThrottleLimit parameter | |
- input object enumerator | |
- experimental parameters | |
.PARAMETER InputObject | |
Specifies the input objects. The cmdlet runs the script block or operation statement on each input object. Enter a variable that contains the objects, or type a command or expression that gets the objects. | |
When you use the InputObject parameter with ForEach-Object, instead of piping command results to ForEach-Object, the InputObject value is treated as a single object. To treat every element of InputObject as an array apply Enumerate parameter. | |
.PARAMETER Enumerate | |
Modifier for InputObject value. When the InputObject parameter is used with the cmdlet, instead of piping command results to the cmdlet, the InputObject value is treated as an array of objects. | |
.PARAMETER ScriptBlock | |
Specifies the script block to be used for parallel processing of input objects. Enter a script block that describes the operation. | |
.PARAMETER RemainingScripts | |
Experimental. Specifies all script blocks that are not taken by the ScriptBlock parameter. | |
.PARAMETER ThrottleLimit | |
Specifies the number of script blocks that in parallel. Input objects are blocked until the running script block count falls below the ThrottleLimit. The default value is NUMBER_OF_PROCESSORS + 2. | |
.PARAMETER Parameters | |
Specifies a hash table of named parameters to be passed to the ScriptBlock. A parameter is specified in a form of "name = value". | |
.PARAMETER Switches | |
Specifies an array of switch parameters by name to the ScriptBlock. | |
.PARAMETER Variables | |
Specifies an array of variables by name to be imported into the background runspace job. | |
.PARAMETER Functions | |
Specifies an array of functions by name to be imported into the background runspace job. | |
.PARAMETER FunctionFiles | |
Specifies a collection of files containing custom functions to be imported to have available in the ScriptBlock. Content of the files will be dotsourced in the ScriptBlock. | |
.PARAMETER Modules | |
Specifies a collection of modules to be imported into the background runspace job. | |
.PARAMETER UseNewRunspace | |
Causes the parallel invocation to create a new runspace for each loop iteration instead of reusing runspaces from the runspace pool. | |
.PARAMETER Mta | |
Causes the parallel invocation to use Multi Thread Apartment threading model. | |
.PARAMETER LocalScope | |
Causes the parallel invocation to use local scope to run the ScriptBlock. | |
.PARAMETER PSCore | |
Forces to load Microsoft.PowerShell.Core module only. | |
.PARAMETER TimeoutSeconds | |
Item timer. Specifies the number of seconds to wait for each input to be processed in parallel. After the specified timeout time, the running script is stopped and output is ignored. Default value of 0 disables the timeout handling. | |
Initializes upon the ScriptBlock start. BatchTimeoutSeconds parameter takes precedence over TimeoutSeconds. | |
.PARAMETER BatchTimeoutSeconds | |
Batch timer. Specifies the number of seconds to wait for all input to be processed in parallel. After the specified timeout time, all running scripts are stopped. Default value of 0 disables the timeout handling. | |
Initializes after all input objects get in queue. BatchTimeoutSeconds parameter takes precedence over TimeoutSeconds. | |
.PARAMETER WorkDelayMs | |
Experimental. Workflow pause timer. Specifies the number of milliseconds to wait before finishing manager starts. A value less than 450 disables the timeout handling. This parameter can be useful on big collections of input objects to reduce overall run time. | |
.PARAMETER ShowErrors | |
Experimental. Causes to send errors through the pipeline. This parameter can be used as a debugging aid when you are running the ScriptBlock and need to troubleshoot it. | |
.PARAMETER NoRot | |
Experimental. Filters the script blocks exceeded the TimeoutSeconds timeout value. ROT is for R[un] O[ut of] T[ime]. | |
.INPUTS | |
PSObject | |
You can pipe any object to this cmdlet. | |
.OUTPUTS | |
PSObject | |
This cmdlet returns objects that are determined by the input. | |
.EXAMPLE | |
Demonstrates how to set various options to pass them to the Runspace scope: | |
# dotsource.ps1 content: | |
#function demo1 { | |
# Write-Host 'dotsource from file' -ForegroundColor Yellow | |
#} | |
$message = 'Hello world from {0}' | |
function fibonacci ([double]$fn, [switch]$series) { | |
try { | |
$fbcache = [System.Collections.Generic.Dictionary[Double,Double]]::new($fn) | |
} catch { | |
Write-Warning 'Stack overflow' | |
return | |
} | |
$fbcache.add(0,0) | |
$fbcache.add(1,1) | |
if ($fn -lt 2) { | |
return $fbcache | |
} | |
try { | |
for ([double]$i=2; $i -le $fn; $i++) { | |
$fbcache[$i] = $fbcache[$i-1] + $fbcache[$i-2] | |
} | |
} catch {Write-Warning 'Stack overflow'} | |
if ($series) {$fbcache} else {$fbcache[$fn]} | |
} # END fibonacci | |
$cmdletparams = @{ | |
Parameters = @{sbpar=12} | |
FunctionFiles = 'dotsource.ps1' | |
RemainingScripts = {Write-Host 'RemainingScripts' -f Green; [datetime]::now} | |
ShowErrors = $true | |
Mta = $true | |
Functions = 'fibonacci' | |
} | |
[pscustomobject]@{a='Atlanta'; b=23444}, | |
[pscustomobject]@{a='New York'; b=503255}, | |
[pscustomobject]@{a='Chicago'; b=278200} | | |
Foreach-Parallel -script { | |
param ($sbpar) | |
$using:message -f [runspace]::DefaultRunspace.InstanceId | |
demo1 | |
'{0,-9} : {1,-9} {2}' -f $_.a, $psitem.b, $($_.b/$sbpar) | |
$fn = Get-Random -Minimum 100 -Maximum 1000 | |
'Fibonacci({1}) = {0}' -f (fibonacci $fn),$fn | |
Start-Sleep (Get-Random -Minimum 1 -Maximum 5) | |
} @cmdletparams | |
.LINK | |
https://gist.github.com/scriptingstudio/a1ce247fd1d6a75996f98ed9f578c10a | |
https://learn.microsoft.com/en-us/dotnet/api/system.management.automation.powershell?view=powershellsdk-7.0.0 | |
.NOTES | |
Version: 2.6.0 | |
Parameter usage classification: | |
- Essential (mandatory) parameters are InputObject, ScriptBlock | |
- Secondary control parameters are Parameters, Switches, Variables, Functions | |
- Fine tune parameters are ThrottleLimit, TimeoutSeconds, BatchTimeOutSeconds | |
- Advanced user parameters are ShowErrors, Mta, UseNewRunspace, PSCore, LocalScope | |
- Experimental parameters are RemainingScripts, NoRot, WorkDelayMs | |
#> | |
function Foreach-Parallel { | |
[CmdletBinding(PositionalBinding=$false)] | |
[alias('Invoke-Parallel','Invoke-Async','Invoke-RSJob','ipar', 'iasync','irsj')] | |
param ( | |
[Parameter(Mandatory, ValueFromPipeline)] | |
[object]$InputObject, | |
[Parameter(Mandatory, Position=0)] | |
[alias('command')][scriptblock]$ScriptBlock, | |
[alias('expand')][switch]$Enumerate, # $InputObject modifier | |
# scriptblock environment | |
[hashtable]$Parameters = @{}, # user-defined named parameters; name=value | |
[string[]]$Switches, # user-defined bool parameters; name | |
[string[]]$Variables, # variables to import; name | |
[string[]]$Functions, # functions to import; name | |
[string[]]$FunctionFiles, # external functions to import; filepath | |
[string[]]$Modules, # modules to import; name | |
[scriptblock[]]$RemainingScripts, # experimental (relevant to the goal of the function?) | |
#[scriptblock]$MiddleScript, # experimental; async payload (relevant to the goal of the function?) | |
# runspace options | |
[alias('maxthreads')][int]$ThrottleLimit, | |
[switch]$UseNewRunspace, | |
[switch]$Mta, | |
[switch]$LocalScope, # experimental | |
[switch]$PSCore, | |
#[switch]$Capture, # experimental; automatically captures inputs and results | |
# runtime controls | |
[int]$TimeoutSeconds, | |
[alias('globalTimeOutSeconds')] | |
[int]$BatchTimeOutSeconds, | |
[int]$WorkDelayMs, # delay before finishing starts; experimental | |
[switch]$NoRot, # experimental | |
[switch]$ShowErrors # experimental | |
) | |
begin { | |
# Adjust defaults | |
if ($Parameters -eq $null) {$Parameters = @{}} | |
# "using:" variables | |
$usingParams = @{} | |
$ast = $ScriptBlock.Ast.FindAll({$args[0] -is [System.Management.Automation.Language.UsingExpressionAst]}, $true) | |
foreach ($usingstatement in $ast) { | |
$varText = $usingstatement.Extent.Text | |
$varPath = $usingstatement.SubExpression.VariablePath.UserPath | |
$key = [Convert]::ToBase64String([System.Text.Encoding]::Unicode.GetBytes($varText.ToLower())) | |
if (-not $usingParams.ContainsKey($key)) { | |
$usingParams.Add($key, $PSCmdlet.SessionState.PSVariable.GetValue($varPath)) | |
} | |
} | |
if ($usingParams.Count) { | |
$Parameters['--%'] = $usingParams | |
} | |
# Scriptblock version 3 | |
# inject automatic scriptblock parameters: $inputobject, $taskobject | |
# and automatic variables: $_, $psitem | |
$timing = '$taskobject.start = [datetime]::now' | |
$autovars = '$_ = $psitem = $inputobject' | |
$sbparams = $ScriptBlock.Ast.ParamBlock.Extent.Text | |
if ($sbparams) { | |
$sbdefinition = "$ScriptBlock".replace($sbparams,'').trim() | |
foreach ($p in '$taskobject','$inputobject') { | |
if ($sbparams -notmatch "\$p") { | |
$sbparams = $sbparams.replace(")", ", $p)") | |
} | |
} | |
} else { | |
$sbdefinition = "$ScriptBlock" | |
$sbparams = 'param ($inputobject, $taskobject)' | |
} | |
if ($RemainingScripts) { # experimental | |
$sbdefinition = $sbdefinition, ($RemainingScripts.foreach{"$_"} -join "`n") -join "`n" | |
} | |
$funcdefinition = if ($FunctionFiles) { | |
($FunctionFiles | ForEach-Object { | |
if ($_ -and (Test-Path $_)) {Get-Content $_ -Raw} | |
}) -join "`n" | |
} | |
# rebuild the scriptblock | |
$ScriptBlock = [scriptblock]::Create( | |
($sbparams, $timing, $autovars, $funcdefinition, $sbdefinition -join "`n") | |
) | |
# Initialize rspool | |
$iss = if ($PSCore) { | |
[InitialSessionState]::CreateDefault2() | |
} else { | |
[InitialSessionState]::CreateDefault() | |
} | |
$taskpool = [System.Collections.Generic.List[object]]::new() | |
foreach ($v in $Variables) { | |
if (-not $v) {continue} | |
$iss.Variables.Add([System.Management.Automation.Runspaces.SessionStateVariableEntry]::new($v, (Get-Variable $v).value, $null)) | |
} | |
foreach ($f in $Functions) { | |
if (-not $f) {continue} | |
$def = (Get-Command $f).Definition | |
$iss.Commands.Add([System.Management.Automation.Runspaces.SessionStateFunctionEntry]::new($f, $def)) | |
} | |
foreach ($m in $Modules) { | |
if (-not $m) {continue} | |
try {$null = $iss.ImportPSModule($m)} catch {} | |
} | |
if ($ThrottleLimit -lt -1) {$ThrottleLimit = -$ThrottleLimit * [int]$env:NUMBER_OF_PROCESSORS} | |
elseif ($ThrottleLimit -lt 2) {$ThrottleLimit = 2 + [int]$env:NUMBER_OF_PROCESSORS} | |
$pool = [runspacefactory]::CreateRunspacePool(1, $ThrottleLimit, $iss, $Host) | |
if ($Mta) {$pool.ApartmentState = 'MTA'} | |
$pool.ThreadOptions = if ($UseNewRunspace) {'UseNewThread'} else {'ReuseThread'} | |
$pool.Open() | |
} # begin | |
process { | |
#$inputs = [System.Management.Automation.PSDataCollection[object]]::new() | |
#$results = [System.Management.Automation.PSDataCollection[object]]::new() | |
$mod = if ($Enumerate) {@{}} else {@{NoEnumerate=$true}} | |
Write-Output -inputObject $inputObject @mod | . { process { | |
if ($_ -eq $null) {return} # a rare use case when InputObject is not piping | |
$job = @{ # task control | |
Instance = $null # task pipeline | |
Handle = $null # pipeline handler | |
Start = $null # actual pipeline start time; initializes automatically upon start | |
Input = $_ | |
#Results = $results | |
} | |
$pspipe = [powershell]::Create().AddScript($ScriptBlock,$LocalScope) | |
$pspipe.RunspacePool = $pool | |
# scriptblock parameters | |
$Parameters['taskObject'] = $job # automatic parameter | |
$Parameters['inputObject'] = $_ # automatic parameter | |
$null = $pspipe.AddParameters($Parameters) | |
if ($Switches) { | |
$Switches | Foreach-Object {$null = $pspipe.AddParameter($_)} | |
} | |
# ready to run | |
$job.Instance = $pspipe | |
$job.Handle = $pspipe.BeginInvoke() | |
#if ($Capture) { | |
# $job.Handle = $pspipe.BeginInvoke($inputs,$results) # experimental | |
#} else {} | |
$taskpool.Add($job) # register the task | |
}} | |
} # process | |
end { | |
#if ($MiddleScript) {$null = & $MiddleScript} | |
# finishing delay; it may acelerate a workflow on big collections | |
if ($workDelayMs -gt 200) {Start-Sleep -Milliseconds $workDelayMs} | |
# adjust timeout controls | |
$startTime = [datetime]::now # for batch timeout control; is here a right place? | |
if ($TimeoutSeconds -lt 0) {$TimeoutSeconds = -$TimeoutSeconds} | |
if ($batchTimeOutSeconds -lt 0) {$batchTimeOutSeconds = -$batchTimeOutSeconds} | |
if ($batchTimeOutSeconds) {$TimeOutSeconds = $batchTimeOutSeconds} | |
# polling task controls | |
While ( $taskpool.Where{$_.Handle -ne $null}.count -gt 0 ) { | |
ForEach ( $job in ($taskpool.Where{$_.Handle.IsCompleted -eq $true}) ) { | |
$job.Instance.EndInvoke($job.Handle) # task results | |
if ($job.Instance.HadErrors -and $ShowErrors) { # TODO: needs refactor | |
$job.Instance.Streams.Error | Out-Default | |
} | |
$job.Instance.Dispose() | |
$job.Handle = $null # this prevents endless looping | |
$null = $taskpool.remove($job) # this reduces loop cycle | |
} | |
#Start-Sleep -Milliseconds 100 # polling deceleration | |
# timeout handler | |
if ($TimeoutSeconds -lt 1) {continue} | |
ForEach ( $job in ($taskpool.Where{$_.Handle.IsCompleted -eq $false}) ) { | |
if ($batchTimeOutSeconds) {$job.start = $startTime} | |
if ($job.start -and (New-TimeSpan -Start $job.start).TotalSeconds -gt $TimeoutSeconds) { | |
if (-not $batchTimeOutSeconds -and -not $NoRot) { # TODO: needs refactor | |
[pscustomobject]@{ | |
Status = 'ROT' # R[un] O[ut of] T[ime] | |
Task = $job.Input | |
Results = $job.results | |
Errors = $job.Instance.Streams.Error | |
} | |
} | |
##$job.Instance.Stop() | |
$job.Instance.Dispose() | |
$job.Handle = $null # this prevents endless looping | |
$null = $taskpool.remove($job) | |
} | |
} | |
} # polling loop | |
$pool.Close() | |
$pool.Dispose() | |
$taskpool.Clear() | |
} # end | |
} # END Foreach-Parallel | |
# DEMO | |
<# | |
dotsource.ps1 content: | |
function demo1 { | |
Write-Host 'dotsource from file' -ForegroundColor Yellow | |
} | |
#> | |
$message = 'Hello world from {0}' | |
$cmdletparams = @{ | |
Parameters = @{test=12} | |
FunctionFiles = 'dotsource.ps1' | |
RemainingScripts = {Write-Host 'RemainingScripts' -f Green; [datetime]::now} | |
ShowErrors = $true | |
Mta = $true | |
} | |
[pscustomobject]@{a='Atlanta'; b=23444}, | |
[pscustomobject]@{a='New York'; b=503255}, | |
[pscustomobject]@{a='Chicago'; b=278200} | | |
Foreach-Parallel -script { | |
param ($test) | |
$using:message -f [runspace]::DefaultRunspace.InstanceId | |
demo1 | |
'{0,-9} : {1,-9} {2}' -f $_.a, $psitem.b, $($_.b/$test) | |
Start-Sleep (Get-Random -Minimum 1 -Maximum 5) | |
} @cmdletparams |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment