限制运行的并发任务数

Limiting the number of concurrent tasks running

提问人:Rodrigo 提问时间:2/11/2016 更新时间:2/22/2020 访问量:7857

问:

所以我经常遇到这个问题。假设我有一个包含 100,000 行文本的文本文件。现在我想将所有这些行保存到一个数据库中。所以我会做这样的事情:go

file, _ := iotuil.ReadFile("file.txt")

fileLines := strings.Split(string(file), "\n")

现在,我将遍历文件中的所有行:

for _, l := range fileLines{
  saveToDB(l)
}

现在我想同时运行这个函数:saveToDB

var wg sync.WaitGroup

for _, l := range fileLines{
  wg.Add(1)
  go saveToDB(l, &wg)
}

wg.Wait()

我不知道这是否是一个问题,但这会运行 100,000 个并发函数。有没有办法说,嘿,运行 100 个并发函数,等待所有这些函数完成,然后再运行 100 个。

for i, _ := range fileLine {
  for t = 0; t < 100; t++{
    wg.Add(1)
    go saveToDB(fileLine[i], &wg)
  }
  wg.Wait()
}

我是否需要做这样的事情,或者有没有更清洁的方法来解决这个问题?还是我运行 100,000 个并发任务不是问题?

Go 并发

评论

0赞 Rodrigo 2/12/2016
关于该主题的精彩阅读: nesv.github.io/golang/2014/02/25/worker-queues-in-go.html

答:

20赞 Not_a_Golfer 2/11/2016 #1

我认为最好的方法是保留一个工人 goroutines 池,在频道中为他们分配工作,然后关闭频道以便他们退出。

像这样的东西:

// create a channel for work "tasks"
ch := make(chan string)

wg := sync.WaitGroup{}

// start the workers
for t := 0; t < 100; t++{
    wg.Add(1)
    go saveToDB(ch, &wg)
}

// push the lines to the queue channel for processing
for _, line := range fileline {
    ch <- line
}

// this will cause the workers to stop and exit their receive loop
close(ch)

// make sure they all exit
wg.Wait()

然后 saveFunction 如下所示:

func saveToDB(ch chan string, wg *sync.WaitGroup) {
    // cnosume a line
    for line := range ch {
        // do work
        actuallySaveToDB(line)
    }
    // we've exited the loop when the dispatcher closed the channel, 
    // so now we can just signal the workGroup we're done
    wg.Done()
}

评论

0赞 evanmcdonnal 2/11/2016
是的,imo 这很好地使用了 Go 提供的结构。
0赞 Sean 2/11/2016
我要对上述内容进行的一项补充是使其成为一个缓冲通道,因此您不会向该通道写入无限的新行。golang.org/doc/effective_go.html#channels
0赞 Not_a_Golfer 2/11/2016
@Sean,如果它没有缓冲,你就不会向通道写入无限的新行 - 你根本不会在它上面发送行,除非所有的工作线程都消耗了所有内容。缓冲通道将允许您写入比消耗的多 N 行,这将导致在工作线程完成之前完成调度,它不会加速或减慢任何事情