提问人:Newtown 提问时间:8/30/2023 最后编辑:Newtown 更新时间:8/31/2023 访问量:101
Powershell 5 BeginInvoke 参数
Powershell 5 BeginInvoke Parameters
问:
我正在尝试了解 PS5 中的并行处理。一切都很简单,直到我用两个参数对 BeginInvoke 进行了重载:BeginInvoke<TInput,TOutput>。 在博客上找到了适用于 TOutput 参数的内容。
$RunspacePool = [RunspaceFactory]::CreateRunspacePool(1, 5)
$RunspacePool.Open()
$Inputs = New-Object 'System.Management.Automation.PSDataCollection[PSObject]'
$Outputs = New-Object 'System.Management.Automation.PSDataCollection[PSObject]'
$ScriptBlock = { Get-Random -Maximum 100 }
$Instances = (1..5) | ForEach-Object {
$Instance = [powershell]::Create().AddScript($ScriptBlock)
$Instance.RunspacePool = $RunspacePool
[PSCustomObject]@{
Instance = $Instance
State = $Instance.BeginInvoke($Inputs,$Outputs)
}
}
while ( $Instances.State.IsCompleted -contains $False) { Start-Sleep -Milliseconds 100 }
运行此操作并查看$Output会产生预期的结果:
PS C:\用户\xyz> $Outputs 10 74 41 56 59
现在,当我尝试通过 $Inputs 将某些内容传递给脚本块时,我从未成功。我知道您可以使用 AddParameters 来做到这一点,但我不喜欢不遗余力,并想了解如何使用此重载来做到这一点。现在花了一个星期的时间在网上查看资源,但找不到正确的方法。据我所知,这将通过管道传递,就像$Outputs在最后一样。这是一种行不通的方法(来自我尝试过的一千种方法...):
$RunspacePool = [RunspaceFactory]::CreateRunspacePool(1, 5)
$RunspacePool.Open()
$Inputs = New-Object 'System.Management.Automation.PSDataCollection[PSObject]'
$Outputs = New-Object 'System.Management.Automation.PSDataCollection[PSObject]'
#Let's say I want to add a bias to the random value
$Inputs.Add( [PSCustomObject]@{
Bias = 100 }
)
$ScriptBlock = {
Param
(
#Hoping to get value from pipeline
[Parameter(Mandatory = $true, ValueFromPipeline = $true)]
[System.Management.Automation.PSDataCollection]$Bias
)
$BiasValue = [PSCustomObject]$Bias[0]
(Get-Random -Maximum 100) + $BiasValue[0].'Bias' }
#Create the threads
$Instances = (1..10) | ForEach-Object {
$Instance = [powershell]::Create().AddScript($ScriptBlock)
$Instance.RunspacePool = $RunspacePool
[PSCustomObject]@{
Instance = $Instance
State = $Instance.BeginInvoke($Inputs,$Outputs)
}
}
#Wait for all threads to finish
while ( $Instances.State.IsCompleted -contains $False) { Start-Sleep -Milliseconds 100 }
当然,这段代码并没有做任何有用的事情,它只是一个测试,以找出如何在 ScriptBlock 中获取$Inputs值。现在$Outputs完全是空的,指向 ScriptBlock 中的错误。
任何帮助将不胜感激。
答:
希望这个例子能帮助你更好地理解。代码的关键问题是脚本块缺少其块,脚本块的参数应该是 或者只是因为线程将从管道 () 接收的只是一个 ,它们不会接收整个 PSDataCollection<TInput>
。也很难给出 PowerShell 的示例,因为它是单线程的。process
psobject
object
TInput
pscustomobject
$RunspacePool = [RunspaceFactory]::CreateRunspacePool(1, 5)
$RunspacePool.Open()
$Inputs = New-Object 'System.Management.Automation.PSDataCollection[PSObject]'
$Outputs = New-Object 'System.Management.Automation.PSDataCollection[PSObject]'
$ScriptBlock = {
Param(
# all threads will receive this and process it in parallel
[Parameter(Mandatory = $true, ValueFromPipeline = $true)]
[psobject] $Bias
)
process {
[pscustomobject]@{
ThreadId = [runspace]::DefaultRunspace.Id
PipelineInput = $Bias
Result = (Get-Random -Maximum 100) + $Bias.Value
}
Start-Sleep 1
}
}
$jobs = [System.Collections.Generic.List[object]]::new()
#Create the threads
1..10 | ForEach-Object {
# simulate input from pipeline
$Inputs.Add([pscustomobject]@{ Value = $_ })
# now start processing
$Instance = [powershell]::Create().AddScript($ScriptBlock)
$Instance.RunspacePool = $RunspacePool
$jobs.Add([PSCustomObject]@{
Instance = $Instance
State = $Instance.BeginInvoke($Inputs, $Outputs)
})
# simulate output, ReadAll() will copy the output into a new
# collection that we can safely read and clear itself
if ($Outputs.Count) {
$Outputs.ReadAll()
}
}
$Inputs.Complete()
# now block until processing is done
do {
$id = [System.Threading.WaitHandle]::WaitAny($jobs.State.AsyncWaitHandle, 200)
# if there is any output from threads, consume it
if ($Outputs.Count) {
$Outputs.ReadAll()
}
if ($id -eq [System.Threading.WaitHandle]::WaitTimeout) {
continue
}
$job = $jobs[$id]
$job.Instance.EndInvoke($job.State)
$job.Instance.Dispose()
$jobs.RemoveAt($id)
}
while ($jobs.Count)
$RunspacePool.Dispose()
要使代码正常工作,需要进行以下两项更改:
使用管道绑定参数进行定义,该参数接收 type parameter 的参数,而不是 type 的参数,并在块中处理该参数。
$ScriptBlock
[pscustomobject]
[System.Management.Automation.PSDataCollection[pscustomobject]]
process { ... }
管道绑定参数不接收集合,它们接收作为管道输入提供的集合的元素,因为集合(枚举)在管道中枚举,即它们的元素被逐个发送到管道。
为了确保处理每个管道输入对象,需要一个模块 - 请参阅管道对象到函数
process { ... }
调用
[System.Management.Automation.PSDataCollection'1]
集合,该集合用作管道输入,这是完成处理 (结束) 所必需的。.Complete()
以下是代码的修改版本(根据需要进行尽可能少的修改;可以进行其他改进) - 查找注释行以指示更改:# !!
$RunspacePool = [RunspaceFactory]::CreateRunspacePool(1, 5)
$RunspacePool.Open()
$Inputs = New-Object 'System.Management.Automation.PSDataCollection[PSObject]'
$Outputs = New-Object 'System.Management.Automation.PSDataCollection[PSObject]'
#Let's say I want to add a bias to the random value
$Inputs.Add( [PSCustomObject]@{
Bias = 100 }
)
# !! Call .Complete() on the input collection to ensure that pipeline
# !! processing ends.
$Inputs.Complete()
$ScriptBlock = {
Param
(
[Parameter(Mandatory = $true, ValueFromPipeline = $true)]
[pscustomobject]$Bias # !! Define the parameter as [pscustomobject]
)
process { # !! Use a `process` block to process each input object.
(Get-Random -Maximum 100) + $Bias.Bias }
}
#Create the threads
$Instances = (1..10) | ForEach-Object {
$Instance = [powershell]::Create().AddScript($ScriptBlock)
$Instance.RunspacePool = $RunspacePool
[PSCustomObject]@{
Instance = $Instance
State = $Instance.BeginInvoke($Inputs, $Outputs)
}
}
#Wait for all threads to finish
while ( $Instances.State.IsCompleted -contains $False) { Start-Sleep -Milliseconds 100 }
# !! Print the output objects
$Outputs
# !! Clean up.
$Instances.Instance.Dispose()
$RunspacePool.Dispose()
请注意,这是线程安全的,因此多个运行空间共享同一个输出集合实例 .[System.Management.Automation.PSDataCollection`1]
$Outputs
评论