如何确保通道读取不会超过选择中的任何其他情况

How to ensure a channel read doesn't overtake any other case in a select

提问人:Miriam Scapece 提问时间:11/13/2023 最后编辑:Jonathan HallMiriam Scapece 更新时间:11/13/2023 访问量:36

问:

我有这 2 个功能:

pointsQueue = make(chan *mongo.UpdateOneModel, 1000)

func UpdatePoints(username string, size int64) {
    pointsDifference := -1 * size
    update := bson.D{{"$inc", bson.D{{"pointsLeft", pointsDifference }}}}
    updateOp := mongo.NewUpdateOneModel()
    updateOp.SetFilter(bson.M{"user": username})
    updateOp.SetUpdate(update)
    pointsQueue <- updateOp
}

func updatePointsWorker() {
    var ctx = context.Background()
    ticker := time.NewTicker(dbBatchTimeout)
    var bulkRequests []mongo.WriteModel

    for {
        select {
        case req := <-pointsQueue:
            bulkRequests = append(bulkRequests, req)
        case <-ticker.C:
            if len(bulkRequests) > 0 {
                _, err := usersCollection.BulkWrite(ctx, bulkRequests)
                if err != nil {
                   fmt.Println(err.Error())
                }
                bulkRequests = nil
            }
    }
}

UpdatePoints可以在短时间内(几秒)调用数千次,不允许选择股票大小写并清空 bulkRequests 数组。如何确保即使不断调用此案例也能调用?最重要的是,没有请求在通道中“丢失”,因为它总是被新的写入所取代UpdatePoints

我试图缓冲通道,但没有用。我试图在存储一定数量的请求时调用批量写入的情况下设置一个 treshold,但这样我丢失了位于通道底部的请求,因为它们之前总是有新的写入。req := <-pointsQueue:

go 并发 通道 goroutine

评论

0赞 JimB 11/13/2023
你不需要做任何事情。通过统一的伪随机选择来选择可以继续进行的案例,确保所有案例最终具有相等的概率。

答:

1赞 Burak Serdar 11/13/2023 #1

运行时应保证公平性,因此股票行情通道不应匮乏。但是,即使这样,您的算法也依赖于速度,因为从通道读取的次数过多可能会将切片增长到不可接受的水平。因此,您应该同时使用计时器和切片长度的上限,以确保切片被正确清空。pointsQueue

empty:=func() {
  _, err := usersCollection.BulkWrite(ctx, bulkRequests)
  if err != nil {
     fmt.Println(err.Error())
  }
  bulkRequests = nil
}

for {
        select {
        case req := <-pointsQueue:
            bulkRequests = append(bulkRequests, req)
            if len(bulkRequests) > limit {
               empty()
               ticker.Reset(dbBatchTimeout)
            }
        case <-ticker.C:
            if len(bulkRequests) > 0 {
                empty()
            }
    }