当所有工作都完成时,如何关闭由多个 goroutine 填充的通道?

How can I close a channel being populated by multiple goroutines when all work is complete?

提问人:AntonioR 提问时间:9/2/2023 最后编辑:AntonioR 更新时间:9/2/2023 访问量:62

问:

我正在尝试遵循 Go 的方式“不要通过共享内存进行交流;相反,通过通信共享内存“,并使用通道异步通信要完成的任务,并发回处理任务的结果。

为了简单起见,我将通道类型更改为 int,而不是它们真正的结构。并用 .time.Sleep()

如何在发回所有任务结果后立即关闭,以便此代码不会卡在最后?producedResultsfor

    quantityOfTasks:= 100
    quantityOfWorkers:= 60
    remainingTasks := make(chan int)
    producedResults := make(chan int)

    // produce tasks
    go func() {
        for i := 0; i < quantityOfTasks; i++ {
            remainingTasks <- 1
        }
        close(remainingTasks)
    }()

    // produce workers
    for i := 0; i < quantityOfWorkers; i++ {
        go func() {
            for taskSize := range remainingTasks {
                // simulate a long task
                time.Sleep(time.Second * time.Duration(taskSize))
                // return the result of the long task
                producedResults <- taskSize
            }
        }()
    }

    // read the results of the tasks and agregate them
    executedTasks := 0
    for resultOfTheTask := range producedResults { //this loop will never finish because producedResults never gets closed
        // consolidate the results of the tasks
        executedTasks += resultOfTheTask
    }
GO 异步 通道

评论


答:

4赞 Burak Serdar 9/2/2023 #1

您希望在所有写入该通道的 goroutine 返回后关闭该通道。为此,您可以使用 WaitGroup:

wg:=sync.WaitGroup{}

for i := 0; i < quantityOfWorkers; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            for taskSize := range remainingTasks {
                //simulate a long task
                time.Sleep(time.Second * time.Duration(taskSize))
                //return the result of the long task
                producedResults <- taskSize
            }
        }()
}

go func() {
  wg.Wait()
  close(producedResults)
}()

0赞 Millie Osterman 9/2/2023 #2

等待工作人员完成后关闭通道。

var wg sync.WaitGroup
wg.Add(quantityOfWorkers)
go func() {
    wg.Wait()
    close(producedResults)
}()

for i := 0; i < quantityOfWorkers; i++ {
    go func() {
        defer wg.Done()
    ...

https://go.dev/play/p/kdgemc5Unsy

0赞 cod3rboy 9/2/2023 #3

只有在所有 worker goroutine 完成执行后,才应关闭通道。每个工作线程都可以通过在使用 A 时调用或关闭专用通道来发出完成信号。 Burak 已经给出了使用 .下面的代码显示了使用专用通道发出信号完成:producedResultsDone()WaitGroupWaitGroup

doneSigs := make([]chan struct{}, quantityOfWorkers)
// produce workers
for i := 0; i < quantityOfWorkers; i++ {
    done := make(chan struct{})
    doneSigs[i] = done
    go func() {
        defer close(done)
        for taskSize := range remainingTasks {
            // simulate a long task
            time.Sleep(time.Second * time.Duration(taskSize))
            // return the result of the long task
            producedResults <- taskSize
        }
    }()
}
go func() {
    defer close(producedResults)
    for i := range doneSigs {
        <-doneSigs[i]
    }
}()