提问人:AntonioR 提问时间:9/2/2023 最后编辑:AntonioR 更新时间:9/2/2023 访问量:62
当所有工作都完成时,如何关闭由多个 goroutine 填充的通道?
How can I close a channel being populated by multiple goroutines when all work is complete?
问:
我正在尝试遵循 Go 的方式“不要通过共享内存进行交流;相反,通过通信共享内存“,并使用通道异步通信要完成的任务,并发回处理任务的结果。
为了简单起见,我将通道类型更改为 int,而不是它们真正的结构。并用 .time.Sleep()
如何在发回所有任务结果后立即关闭,以便此代码不会卡在最后?producedResults
for
quantityOfTasks:= 100
quantityOfWorkers:= 60
remainingTasks := make(chan int)
producedResults := make(chan int)
// produce tasks
go func() {
for i := 0; i < quantityOfTasks; i++ {
remainingTasks <- 1
}
close(remainingTasks)
}()
// produce workers
for i := 0; i < quantityOfWorkers; i++ {
go func() {
for taskSize := range remainingTasks {
// simulate a long task
time.Sleep(time.Second * time.Duration(taskSize))
// return the result of the long task
producedResults <- taskSize
}
}()
}
// read the results of the tasks and agregate them
executedTasks := 0
for resultOfTheTask := range producedResults { //this loop will never finish because producedResults never gets closed
// consolidate the results of the tasks
executedTasks += resultOfTheTask
}
答:
4赞
Burak Serdar
9/2/2023
#1
您希望在所有写入该通道的 goroutine 返回后关闭该通道。为此,您可以使用 WaitGroup:
wg:=sync.WaitGroup{}
for i := 0; i < quantityOfWorkers; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for taskSize := range remainingTasks {
//simulate a long task
time.Sleep(time.Second * time.Duration(taskSize))
//return the result of the long task
producedResults <- taskSize
}
}()
}
go func() {
wg.Wait()
close(producedResults)
}()
0赞
Millie Osterman
9/2/2023
#2
等待工作人员完成后关闭通道。
var wg sync.WaitGroup
wg.Add(quantityOfWorkers)
go func() {
wg.Wait()
close(producedResults)
}()
for i := 0; i < quantityOfWorkers; i++ {
go func() {
defer wg.Done()
...
https://go.dev/play/p/kdgemc5Unsy
0赞
cod3rboy
9/2/2023
#3
只有在所有 worker goroutine 完成执行后,才应关闭通道。每个工作线程都可以通过在使用 A 时调用或关闭专用通道来发出完成信号。
Burak 已经给出了使用 .下面的代码显示了使用专用通道发出信号完成:producedResults
Done()
WaitGroup
WaitGroup
doneSigs := make([]chan struct{}, quantityOfWorkers)
// produce workers
for i := 0; i < quantityOfWorkers; i++ {
done := make(chan struct{})
doneSigs[i] = done
go func() {
defer close(done)
for taskSize := range remainingTasks {
// simulate a long task
time.Sleep(time.Second * time.Duration(taskSize))
// return the result of the long task
producedResults <- taskSize
}
}()
}
go func() {
defer close(producedResults)
for i := range doneSigs {
<-doneSigs[i]
}
}()
评论