提问人:Rodrigo 提问时间:2/11/2016 更新时间:2/22/2020 访问量:7857
限制运行的并发任务数
Limiting the number of concurrent tasks running
问:
所以我经常遇到这个问题。假设我有一个包含 100,000 行文本的文本文件。现在我想将所有这些行保存到一个数据库中。所以我会做这样的事情:go
file, _ := iotuil.ReadFile("file.txt")
fileLines := strings.Split(string(file), "\n")
现在,我将遍历文件中的所有行:
for _, l := range fileLines{
saveToDB(l)
}
现在我想同时运行这个函数:saveToDB
var wg sync.WaitGroup
for _, l := range fileLines{
wg.Add(1)
go saveToDB(l, &wg)
}
wg.Wait()
我不知道这是否是一个问题,但这会运行 100,000 个并发函数。有没有办法说,嘿,运行 100 个并发函数,等待所有这些函数完成,然后再运行 100 个。
for i, _ := range fileLine {
for t = 0; t < 100; t++{
wg.Add(1)
go saveToDB(fileLine[i], &wg)
}
wg.Wait()
}
我是否需要做这样的事情,或者有没有更清洁的方法来解决这个问题?还是我运行 100,000 个并发任务不是问题?
答:
20赞
Not_a_Golfer
2/11/2016
#1
我认为最好的方法是保留一个工人 goroutines 池,在频道中为他们分配工作,然后关闭频道以便他们退出。
像这样的东西:
// create a channel for work "tasks"
ch := make(chan string)
wg := sync.WaitGroup{}
// start the workers
for t := 0; t < 100; t++{
wg.Add(1)
go saveToDB(ch, &wg)
}
// push the lines to the queue channel for processing
for _, line := range fileline {
ch <- line
}
// this will cause the workers to stop and exit their receive loop
close(ch)
// make sure they all exit
wg.Wait()
然后 saveFunction 如下所示:
func saveToDB(ch chan string, wg *sync.WaitGroup) {
// cnosume a line
for line := range ch {
// do work
actuallySaveToDB(line)
}
// we've exited the loop when the dispatcher closed the channel,
// so now we can just signal the workGroup we're done
wg.Done()
}
评论
0赞
evanmcdonnal
2/11/2016
是的,imo 这很好地使用了 Go 提供的结构。
0赞
Sean
2/11/2016
我要对上述内容进行的一项补充是使其成为一个缓冲通道,因此您不会向该通道写入无限的新行。golang.org/doc/effective_go.html#channels
0赞
Not_a_Golfer
2/11/2016
@Sean,如果它没有缓冲,你就不会向通道写入无限的新行 - 你根本不会在它上面发送行,除非所有的工作线程都消耗了所有内容。缓冲通道将允许您写入比消耗的多 N 行,这将导致在工作线程完成之前完成调度,它不会加速或减慢任何事情
上一个:创建 go socks5 客户端
下一个:捕获 execSync 错误
评论