提问人:Miriam Scapece 提问时间:11/13/2023 最后编辑:Jonathan HallMiriam Scapece 更新时间:11/13/2023 访问量:36
如何确保通道读取不会超过选择中的任何其他情况
How to ensure a channel read doesn't overtake any other case in a select
问:
我有这 2 个功能:
pointsQueue = make(chan *mongo.UpdateOneModel, 1000)
func UpdatePoints(username string, size int64) {
pointsDifference := -1 * size
update := bson.D{{"$inc", bson.D{{"pointsLeft", pointsDifference }}}}
updateOp := mongo.NewUpdateOneModel()
updateOp.SetFilter(bson.M{"user": username})
updateOp.SetUpdate(update)
pointsQueue <- updateOp
}
func updatePointsWorker() {
var ctx = context.Background()
ticker := time.NewTicker(dbBatchTimeout)
var bulkRequests []mongo.WriteModel
for {
select {
case req := <-pointsQueue:
bulkRequests = append(bulkRequests, req)
case <-ticker.C:
if len(bulkRequests) > 0 {
_, err := usersCollection.BulkWrite(ctx, bulkRequests)
if err != nil {
fmt.Println(err.Error())
}
bulkRequests = nil
}
}
}
UpdatePoints
可以在短时间内(几秒)调用数千次,不允许选择股票大小写并清空 bulkRequests 数组。如何确保即使不断调用此案例也能调用?最重要的是,没有请求在通道中“丢失”,因为它总是被新的写入所取代UpdatePoints
我试图缓冲通道,但没有用。我试图在存储一定数量的请求时调用批量写入的情况下设置一个 treshold,但这样我丢失了位于通道底部的请求,因为它们之前总是有新的写入。req := <-pointsQueue:
答:
1赞
Burak Serdar
11/13/2023
#1
运行时应保证公平性,因此股票行情通道不应匮乏。但是,即使这样,您的算法也依赖于速度,因为从通道读取的次数过多可能会将切片增长到不可接受的水平。因此,您应该同时使用计时器和切片长度的上限,以确保切片被正确清空。pointsQueue
empty:=func() {
_, err := usersCollection.BulkWrite(ctx, bulkRequests)
if err != nil {
fmt.Println(err.Error())
}
bulkRequests = nil
}
for {
select {
case req := <-pointsQueue:
bulkRequests = append(bulkRequests, req)
if len(bulkRequests) > limit {
empty()
ticker.Reset(dbBatchTimeout)
}
case <-ticker.C:
if len(bulkRequests) > 0 {
empty()
}
}
评论