Powershell 5 BeginInvoke 参数

Powershell 5 BeginInvoke Parameters

提问人:Newtown 提问时间:8/30/2023 最后编辑:Newtown 更新时间:8/31/2023 访问量:101

问:

我正在尝试了解 PS5 中的并行处理。一切都很简单,直到我用两个参数对 BeginInvoke 进行了重载:BeginInvoke<TInput,TOutput>。 在博客上找到了适用于 TOutput 参数的内容。

$RunspacePool = [RunspaceFactory]::CreateRunspacePool(1, 5)
$RunspacePool.Open()

$Inputs  = New-Object 'System.Management.Automation.PSDataCollection[PSObject]'
$Outputs = New-Object 'System.Management.Automation.PSDataCollection[PSObject]'

$ScriptBlock = { Get-Random -Maximum 100 }

$Instances = (1..5) | ForEach-Object {
    $Instance = [powershell]::Create().AddScript($ScriptBlock)
    $Instance.RunspacePool = $RunspacePool
    [PSCustomObject]@{
        Instance = $Instance
        State = $Instance.BeginInvoke($Inputs,$Outputs)
    }
}

while ( $Instances.State.IsCompleted -contains $False) { Start-Sleep -Milliseconds 100 }

运行此操作并查看$Output会产生预期的结果:

PS C:\用户\xyz> $Outputs 10 74 41 56 59

现在,当我尝试通过 $Inputs 将某些内容传递给脚本块时,我从未成功。我知道您可以使用 AddParameters 来做到这一点,但我不喜欢不遗余力,并想了解如何使用此重载来做到这一点。现在花了一个星期的时间在网上查看资源,但找不到正确的方法。据我所知,这将通过管道传递,就像$Outputs在最后一样。这是一种行不通的方法(来自我尝试过的一千种方法...):

$RunspacePool = [RunspaceFactory]::CreateRunspacePool(1, 5)
$RunspacePool.Open()

$Inputs  = New-Object 'System.Management.Automation.PSDataCollection[PSObject]'
$Outputs = New-Object 'System.Management.Automation.PSDataCollection[PSObject]'

#Let's say I want to add a bias to the random value
$Inputs.Add( [PSCustomObject]@{
        Bias = 100 }
        )

$ScriptBlock = { 
    Param    
    (
    #Hoping to get value from pipeline
    [Parameter(Mandatory = $true, ValueFromPipeline = $true)]
    [System.Management.Automation.PSDataCollection]$Bias
    )
    $BiasValue = [PSCustomObject]$Bias[0]
    (Get-Random -Maximum 100) + $BiasValue[0].'Bias' }

#Create the threads
$Instances = (1..10) | ForEach-Object {
    $Instance = [powershell]::Create().AddScript($ScriptBlock)
    $Instance.RunspacePool = $RunspacePool
    [PSCustomObject]@{
        Instance = $Instance
        State = $Instance.BeginInvoke($Inputs,$Outputs)
    }
}

#Wait for all threads to finish
while ( $Instances.State.IsCompleted -contains $False) { Start-Sleep -Milliseconds 100 }

当然,这段代码并没有做任何有用的事情,它只是一个测试,以找出如何在 ScriptBlock 中获取$Inputs值。现在$Outputs完全是空的,指向 ScriptBlock 中的错误。

任何帮助将不胜感激。

PowerShell 输入 输出 begininvoke

评论

0赞 jdweng 8/30/2023
您正在执行数据库查询。当然,当您的查询找不到任何数据时,您会得到一个空响应。
2赞 Santiago Squarzon 8/30/2023
忽略上面的评论,他们只是发表评论,不知道他们在说什么。
0赞 jdweng 8/30/2023
您正在执行查询,但该查询未返回任何数据。您没有收到任何错误。System.Management.Automation 是一个数据库查询工具。第一个示例没有参数,只是获取脚本“Get-Random -Maximum 100”的结果。要有输入,脚本需要有输入。目前,脚本没有输入,因此向$Instance添加输入不会更改结果。
2赞 Doug Maurer 8/30/2023
@jdweng你似乎很困惑。learn.microsoft.com/en-us/dotnet/api/......
2赞 mklement0 8/31/2023
@jdweng:你困惑,正如这里多个用户的反馈所暗示的那样。鉴于这不是孤立的事件,让我再次为您提供以下建议:请仅在您 (a) 完全理解问题和 (b) 有足够的主题专业知识来提供有用信息的情况下发表评论。通常情况下,您的评论不仅无益,而且令人困惑尤其是在带有不相关链接的情况下(这里的情况并非如此)。

答:

0赞 Santiago Squarzon 8/30/2023 #1

希望这个例子能帮助你更好地理解。代码的关键问题是脚本块缺少其块,脚本块的参数应该是 或者只是因为线程将从管道 () 接收的只是一个 ,它们不会接收整个 PSDataCollection<TInput>。也很难给出 PowerShell 的示例,因为它是单线程的。processpsobjectobjectTInputpscustomobject

$RunspacePool = [RunspaceFactory]::CreateRunspacePool(1, 5)
$RunspacePool.Open()

$Inputs = New-Object 'System.Management.Automation.PSDataCollection[PSObject]'
$Outputs = New-Object 'System.Management.Automation.PSDataCollection[PSObject]'

$ScriptBlock = {
    Param(
        # all threads will receive this and process it in parallel
        [Parameter(Mandatory = $true, ValueFromPipeline = $true)]
        [psobject] $Bias
    )

    process {
        [pscustomobject]@{
            ThreadId      = [runspace]::DefaultRunspace.Id
            PipelineInput = $Bias
            Result        = (Get-Random -Maximum 100) + $Bias.Value
        }
        Start-Sleep 1
    }
}

$jobs = [System.Collections.Generic.List[object]]::new()
#Create the threads
1..10 | ForEach-Object {
    # simulate input from pipeline
    $Inputs.Add([pscustomobject]@{ Value = $_ })

    # now start processing
    $Instance = [powershell]::Create().AddScript($ScriptBlock)
    $Instance.RunspacePool = $RunspacePool
    $jobs.Add([PSCustomObject]@{
        Instance = $Instance
        State    = $Instance.BeginInvoke($Inputs, $Outputs)
    })

    # simulate output, ReadAll() will copy the output into a new
    # collection that we can safely read and clear itself
    if ($Outputs.Count) {
        $Outputs.ReadAll()
    }
}

$Inputs.Complete()

# now block until processing is done
do {
    $id = [System.Threading.WaitHandle]::WaitAny($jobs.State.AsyncWaitHandle, 200)
    # if there is any output from threads, consume it
    if ($Outputs.Count) {
        $Outputs.ReadAll()
    }

    if ($id -eq [System.Threading.WaitHandle]::WaitTimeout) {
        continue
    }

    $job = $jobs[$id]
    $job.Instance.EndInvoke($job.State)
    $job.Instance.Dispose()
    $jobs.RemoveAt($id)
}
while ($jobs.Count)

$RunspacePool.Dispose()
0赞 mklement0 8/31/2023 #2

要使代码正常工作,需要进行以下两项更改:

  • 使用管道绑定参数进行定义,该参数接收 type parameter 的参数,而不是 type 的参数,并在块中处理该参数。$ScriptBlock[pscustomobject][System.Management.Automation.PSDataCollection[pscustomobject]]process { ... }

    • 管道绑定参数不接收集合,它们接收作为管道输入提供的集合元素,因为集合(枚举)在管道中枚举,即它们的元素逐个发送到管道。

    • 为了确保处理每个管道输入对象,需要一个模块 - 请参阅管道对象到函数process { ... }

  • 调用 [System.Management.Automation.PSDataCollection'1] 集合,该集合用作管道输入,这是完成处理 (结束) 所必需的。.Complete()

以下是代码的修改版本(根据需要进行尽可能少的修改;可以进行其他改进) - 查找注释行以指示更改:# !!

$RunspacePool = [RunspaceFactory]::CreateRunspacePool(1, 5)
$RunspacePool.Open()

$Inputs  = New-Object 'System.Management.Automation.PSDataCollection[PSObject]'
$Outputs = New-Object 'System.Management.Automation.PSDataCollection[PSObject]'

#Let's say I want to add a bias to the random value
$Inputs.Add( [PSCustomObject]@{
        Bias = 100 }
        )

# !! Call .Complete() on the input collection to ensure that pipeline 
# !! processing ends.
$Inputs.Complete()

$ScriptBlock = { 
    Param    
    (
      [Parameter(Mandatory = $true, ValueFromPipeline = $true)]
      [pscustomobject]$Bias # !! Define the parameter as [pscustomobject]
    )
    process { # !! Use a `process` block to process each input object.
      (Get-Random -Maximum 100) + $Bias.Bias }
    }

#Create the threads
$Instances = (1..10) | ForEach-Object {
    $Instance = [powershell]::Create().AddScript($ScriptBlock)
    $Instance.RunspacePool = $RunspacePool
    [PSCustomObject]@{
        Instance = $Instance
        State = $Instance.BeginInvoke($Inputs, $Outputs)
    }
}

#Wait for all threads to finish
while ( $Instances.State.IsCompleted -contains $False) { Start-Sleep -Milliseconds 100 }

# !! Print the output objects
$Outputs

# !! Clean up.
$Instances.Instance.Dispose()
$RunspacePool.Dispose()

请注意,这是线程安全的,因此多个运行空间共享同一个输出集合实例 .[System.Management.Automation.PSDataCollection`1]$Outputs

评论

0赞 Newtown 9/1/2023
谢谢你的这个版本。起初我没有注意到的一件事是,在这两个答案中,每个线程都会自行处理所有输入。我发现使其工作的方法是将注释属性添加到名为 assigned 的输入中并检查它是否为 false,当线程抓住它时,我将其更改为 true。如果为 true,则立即返回并抓住下一个。