Skip to content

Instantly share code, notes, and snippets.

@scriptingstudio
Last active January 14, 2023 06:33
Show Gist options
  • Save scriptingstudio/a1ce247fd1d6a75996f98ed9f578c10a to your computer and use it in GitHub Desktop.
Save scriptingstudio/a1ce247fd1d6a75996f98ed9f578c10a to your computer and use it in GitHub Desktop.
Foreach-Parallel
<#
.SYNOPSIS
Performs parallel processing of input objects.
.DESCRIPTION
Simple PowerShell function to asynchronously run jobs.
Yet another parallel processing manager on Earth.
FEATURES:
- no modules, just one small self-sufficient function
- automatic scriptblock variables: $_, $psitem
- automatic scriptblock parameters
- $taskObject - contains the current task object which provides its auto timing
- $inputObject - contains the current pipe object
- runtime controls
- granular timing: item timer, batch timer
- task auto timing by $taskObject - item timeout control
- delay before finishing starts: useful on big collections
- multiple scripts
- import of external variables, functions, modules
- import of function files
- support of "using:" scope modifier
- runspace options
- asynchronous job finishing
- smart ThrottleLimit parameter
- input object enumerator
- experimental parameters
.PARAMETER InputObject
Specifies the input objects. The cmdlet runs the script block or operation statement on each input object. Enter a variable that contains the objects, or type a command or expression that gets the objects.
When you use the InputObject parameter with ForEach-Object, instead of piping command results to ForEach-Object, the InputObject value is treated as a single object. To treat every element of InputObject as an array apply Enumerate parameter.
.PARAMETER Enumerate
Modifier for InputObject value. When the InputObject parameter is used with the cmdlet, instead of piping command results to the cmdlet, the InputObject value is treated as an array of objects.
.PARAMETER ScriptBlock
Specifies the script block to be used for parallel processing of input objects. Enter a script block that describes the operation.
.PARAMETER RemainingScripts
Experimental. Specifies all script blocks that are not taken by the ScriptBlock parameter.
.PARAMETER ThrottleLimit
Specifies the number of script blocks that in parallel. Input objects are blocked until the running script block count falls below the ThrottleLimit. The default value is NUMBER_OF_PROCESSORS + 2.
.PARAMETER Parameters
Specifies a hash table of named parameters to be passed to the ScriptBlock. A parameter is specified in a form of "name = value".
.PARAMETER Switches
Specifies an array of switch parameters by name to the ScriptBlock.
.PARAMETER Variables
Specifies an array of variables by name to be imported into the background runspace job.
.PARAMETER Functions
Specifies an array of functions by name to be imported into the background runspace job.
.PARAMETER FunctionFiles
Specifies a collection of files containing custom functions to be imported to have available in the ScriptBlock. Content of the files will be dotsourced in the ScriptBlock.
.PARAMETER Modules
Specifies a collection of modules to be imported into the background runspace job.
.PARAMETER UseNewRunspace
Causes the parallel invocation to create a new runspace for each loop iteration instead of reusing runspaces from the runspace pool.
.PARAMETER Mta
Causes the parallel invocation to use Multi Thread Apartment threading model.
.PARAMETER LocalScope
Causes the parallel invocation to use local scope to run the ScriptBlock.
.PARAMETER PSCore
Forces to load Microsoft.PowerShell.Core module only.
.PARAMETER TimeoutSeconds
Item timer. Specifies the number of seconds to wait for each input to be processed in parallel. After the specified timeout time, the running script is stopped and output is ignored. Default value of 0 disables the timeout handling.
Initializes upon the ScriptBlock start. BatchTimeoutSeconds parameter takes precedence over TimeoutSeconds.
.PARAMETER BatchTimeoutSeconds
Batch timer. Specifies the number of seconds to wait for all input to be processed in parallel. After the specified timeout time, all running scripts are stopped. Default value of 0 disables the timeout handling.
Initializes after all input objects get in queue. BatchTimeoutSeconds parameter takes precedence over TimeoutSeconds.
.PARAMETER WorkDelayMs
Experimental. Workflow pause timer. Specifies the number of milliseconds to wait before finishing manager starts. A value less than 450 disables the timeout handling. This parameter can be useful on big collections of input objects to reduce overall run time.
.PARAMETER ShowErrors
Experimental. Causes to send errors through the pipeline. This parameter can be used as a debugging aid when you are running the ScriptBlock and need to troubleshoot it.
.PARAMETER NoRot
Experimental. Filters the script blocks exceeded the TimeoutSeconds timeout value. ROT is for R[un] O[ut of] T[ime].
.INPUTS
PSObject
You can pipe any object to this cmdlet.
.OUTPUTS
PSObject
This cmdlet returns objects that are determined by the input.
.EXAMPLE
Demonstrates how to set various options to pass them to the Runspace scope:
# dotsource.ps1 content:
#function demo1 {
# Write-Host 'dotsource from file' -ForegroundColor Yellow
#}
$message = 'Hello world from {0}'
function fibonacci ([double]$fn, [switch]$series) {
try {
$fbcache = [System.Collections.Generic.Dictionary[Double,Double]]::new($fn)
} catch {
Write-Warning 'Stack overflow'
return
}
$fbcache.add(0,0)
$fbcache.add(1,1)
if ($fn -lt 2) {
return $fbcache
}
try {
for ([double]$i=2; $i -le $fn; $i++) {
$fbcache[$i] = $fbcache[$i-1] + $fbcache[$i-2]
}
} catch {Write-Warning 'Stack overflow'}
if ($series) {$fbcache} else {$fbcache[$fn]}
} # END fibonacci
$cmdletparams = @{
Parameters = @{sbpar=12}
FunctionFiles = 'dotsource.ps1'
RemainingScripts = {Write-Host 'RemainingScripts' -f Green; [datetime]::now}
ShowErrors = $true
Mta = $true
Functions = 'fibonacci'
}
[pscustomobject]@{a='Atlanta'; b=23444},
[pscustomobject]@{a='New York'; b=503255},
[pscustomobject]@{a='Chicago'; b=278200} |
Foreach-Parallel -script {
param ($sbpar)
$using:message -f [runspace]::DefaultRunspace.InstanceId
demo1
'{0,-9} : {1,-9} {2}' -f $_.a, $psitem.b, $($_.b/$sbpar)
$fn = Get-Random -Minimum 100 -Maximum 1000
'Fibonacci({1}) = {0}' -f (fibonacci $fn),$fn
Start-Sleep (Get-Random -Minimum 1 -Maximum 5)
} @cmdletparams
.LINK
https://gist.github.com/scriptingstudio/a1ce247fd1d6a75996f98ed9f578c10a
https://learn.microsoft.com/en-us/dotnet/api/system.management.automation.powershell?view=powershellsdk-7.0.0
.NOTES
Version: 2.6.0
Parameter usage classification:
- Essential (mandatory) parameters are InputObject, ScriptBlock
- Secondary control parameters are Parameters, Switches, Variables, Functions
- Fine tune parameters are ThrottleLimit, TimeoutSeconds, BatchTimeOutSeconds
- Advanced user parameters are ShowErrors, Mta, UseNewRunspace, PSCore, LocalScope
- Experimental parameters are RemainingScripts, NoRot, WorkDelayMs
#>
function Foreach-Parallel {
[CmdletBinding(PositionalBinding=$false)]
[alias('Invoke-Parallel','Invoke-Async','Invoke-RSJob','ipar', 'iasync','irsj')]
param (
[Parameter(Mandatory, ValueFromPipeline)]
[object]$InputObject,
[Parameter(Mandatory, Position=0)]
[alias('command')][scriptblock]$ScriptBlock,
[alias('expand')][switch]$Enumerate, # $InputObject modifier
# scriptblock environment
[hashtable]$Parameters = @{}, # user-defined named parameters; name=value
[string[]]$Switches, # user-defined bool parameters; name
[string[]]$Variables, # variables to import; name
[string[]]$Functions, # functions to import; name
[string[]]$FunctionFiles, # external functions to import; filepath
[string[]]$Modules, # modules to import; name
[scriptblock[]]$RemainingScripts, # experimental (relevant to the goal of the function?)
#[scriptblock]$MiddleScript, # experimental; async payload (relevant to the goal of the function?)
# runspace options
[alias('maxthreads')][int]$ThrottleLimit,
[switch]$UseNewRunspace,
[switch]$Mta,
[switch]$LocalScope, # experimental
[switch]$PSCore,
#[switch]$Capture, # experimental; automatically captures inputs and results
# runtime controls
[int]$TimeoutSeconds,
[alias('globalTimeOutSeconds')]
[int]$BatchTimeOutSeconds,
[int]$WorkDelayMs, # delay before finishing starts; experimental
[switch]$NoRot, # experimental
[switch]$ShowErrors # experimental
)
begin {
# Adjust defaults
if ($Parameters -eq $null) {$Parameters = @{}}
# "using:" variables
$usingParams = @{}
$ast = $ScriptBlock.Ast.FindAll({$args[0] -is [System.Management.Automation.Language.UsingExpressionAst]}, $true)
foreach ($usingstatement in $ast) {
$varText = $usingstatement.Extent.Text
$varPath = $usingstatement.SubExpression.VariablePath.UserPath
$key = [Convert]::ToBase64String([System.Text.Encoding]::Unicode.GetBytes($varText.ToLower()))
if (-not $usingParams.ContainsKey($key)) {
$usingParams.Add($key, $PSCmdlet.SessionState.PSVariable.GetValue($varPath))
}
}
if ($usingParams.Count) {
$Parameters['--%'] = $usingParams
}
# Scriptblock version 3
# inject automatic scriptblock parameters: $inputobject, $taskobject
# and automatic variables: $_, $psitem
$timing = '$taskobject.start = [datetime]::now'
$autovars = '$_ = $psitem = $inputobject'
$sbparams = $ScriptBlock.Ast.ParamBlock.Extent.Text
if ($sbparams) {
$sbdefinition = "$ScriptBlock".replace($sbparams,'').trim()
foreach ($p in '$taskobject','$inputobject') {
if ($sbparams -notmatch "\$p") {
$sbparams = $sbparams.replace(")", ", $p)")
}
}
} else {
$sbdefinition = "$ScriptBlock"
$sbparams = 'param ($inputobject, $taskobject)'
}
if ($RemainingScripts) { # experimental
$sbdefinition = $sbdefinition, ($RemainingScripts.foreach{"$_"} -join "`n") -join "`n"
}
$funcdefinition = if ($FunctionFiles) {
($FunctionFiles | ForEach-Object {
if ($_ -and (Test-Path $_)) {Get-Content $_ -Raw}
}) -join "`n"
}
# rebuild the scriptblock
$ScriptBlock = [scriptblock]::Create(
($sbparams, $timing, $autovars, $funcdefinition, $sbdefinition -join "`n")
)
# Initialize rspool
$iss = if ($PSCore) {
[InitialSessionState]::CreateDefault2()
} else {
[InitialSessionState]::CreateDefault()
}
$taskpool = [System.Collections.Generic.List[object]]::new()
foreach ($v in $Variables) {
if (-not $v) {continue}
$iss.Variables.Add([System.Management.Automation.Runspaces.SessionStateVariableEntry]::new($v, (Get-Variable $v).value, $null))
}
foreach ($f in $Functions) {
if (-not $f) {continue}
$def = (Get-Command $f).Definition
$iss.Commands.Add([System.Management.Automation.Runspaces.SessionStateFunctionEntry]::new($f, $def))
}
foreach ($m in $Modules) {
if (-not $m) {continue}
try {$null = $iss.ImportPSModule($m)} catch {}
}
if ($ThrottleLimit -lt -1) {$ThrottleLimit = -$ThrottleLimit * [int]$env:NUMBER_OF_PROCESSORS}
elseif ($ThrottleLimit -lt 2) {$ThrottleLimit = 2 + [int]$env:NUMBER_OF_PROCESSORS}
$pool = [runspacefactory]::CreateRunspacePool(1, $ThrottleLimit, $iss, $Host)
if ($Mta) {$pool.ApartmentState = 'MTA'}
$pool.ThreadOptions = if ($UseNewRunspace) {'UseNewThread'} else {'ReuseThread'}
$pool.Open()
} # begin
process {
#$inputs = [System.Management.Automation.PSDataCollection[object]]::new()
#$results = [System.Management.Automation.PSDataCollection[object]]::new()
$mod = if ($Enumerate) {@{}} else {@{NoEnumerate=$true}}
Write-Output -inputObject $inputObject @mod | . { process {
if ($_ -eq $null) {return} # a rare use case when InputObject is not piping
$job = @{ # task control
Instance = $null # task pipeline
Handle = $null # pipeline handler
Start = $null # actual pipeline start time; initializes automatically upon start
Input = $_
#Results = $results
}
$pspipe = [powershell]::Create().AddScript($ScriptBlock,$LocalScope)
$pspipe.RunspacePool = $pool
# scriptblock parameters
$Parameters['taskObject'] = $job # automatic parameter
$Parameters['inputObject'] = $_ # automatic parameter
$null = $pspipe.AddParameters($Parameters)
if ($Switches) {
$Switches | Foreach-Object {$null = $pspipe.AddParameter($_)}
}
# ready to run
$job.Instance = $pspipe
$job.Handle = $pspipe.BeginInvoke()
#if ($Capture) {
# $job.Handle = $pspipe.BeginInvoke($inputs,$results) # experimental
#} else {}
$taskpool.Add($job) # register the task
}}
} # process
end {
#if ($MiddleScript) {$null = & $MiddleScript}
# finishing delay; it may acelerate a workflow on big collections
if ($workDelayMs -gt 200) {Start-Sleep -Milliseconds $workDelayMs}
# adjust timeout controls
$startTime = [datetime]::now # for batch timeout control; is here a right place?
if ($TimeoutSeconds -lt 0) {$TimeoutSeconds = -$TimeoutSeconds}
if ($batchTimeOutSeconds -lt 0) {$batchTimeOutSeconds = -$batchTimeOutSeconds}
if ($batchTimeOutSeconds) {$TimeOutSeconds = $batchTimeOutSeconds}
# polling task controls
While ( $taskpool.Where{$_.Handle -ne $null}.count -gt 0 ) {
ForEach ( $job in ($taskpool.Where{$_.Handle.IsCompleted -eq $true}) ) {
$job.Instance.EndInvoke($job.Handle) # task results
if ($job.Instance.HadErrors -and $ShowErrors) { # TODO: needs refactor
$job.Instance.Streams.Error | Out-Default
}
$job.Instance.Dispose()
$job.Handle = $null # this prevents endless looping
$null = $taskpool.remove($job) # this reduces loop cycle
}
#Start-Sleep -Milliseconds 100 # polling deceleration
# timeout handler
if ($TimeoutSeconds -lt 1) {continue}
ForEach ( $job in ($taskpool.Where{$_.Handle.IsCompleted -eq $false}) ) {
if ($batchTimeOutSeconds) {$job.start = $startTime}
if ($job.start -and (New-TimeSpan -Start $job.start).TotalSeconds -gt $TimeoutSeconds) {
if (-not $batchTimeOutSeconds -and -not $NoRot) { # TODO: needs refactor
[pscustomobject]@{
Status = 'ROT' # R[un] O[ut of] T[ime]
Task = $job.Input
Results = $job.results
Errors = $job.Instance.Streams.Error
}
}
##$job.Instance.Stop()
$job.Instance.Dispose()
$job.Handle = $null # this prevents endless looping
$null = $taskpool.remove($job)
}
}
} # polling loop
$pool.Close()
$pool.Dispose()
$taskpool.Clear()
} # end
} # END Foreach-Parallel
# DEMO
<#
dotsource.ps1 content:
function demo1 {
Write-Host 'dotsource from file' -ForegroundColor Yellow
}
#>
$message = 'Hello world from {0}'
$cmdletparams = @{
Parameters = @{test=12}
FunctionFiles = 'dotsource.ps1'
RemainingScripts = {Write-Host 'RemainingScripts' -f Green; [datetime]::now}
ShowErrors = $true
Mta = $true
}
[pscustomobject]@{a='Atlanta'; b=23444},
[pscustomobject]@{a='New York'; b=503255},
[pscustomobject]@{a='Chicago'; b=278200} |
Foreach-Parallel -script {
param ($test)
$using:message -f [runspace]::DefaultRunspace.InstanceId
demo1
'{0,-9} : {1,-9} {2}' -f $_.a, $psitem.b, $($_.b/$test)
Start-Sleep (Get-Random -Minimum 1 -Maximum 5)
} @cmdletparams
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment