如何按顺序执行一系列任务?

How to execute an array of tasks sequentially?

提问人:Giulio 提问时间:6/10/2023 最后编辑:Brian BernsGiulio 更新时间:6/11/2023 访问量:140

问:

我有以下要查看的代码

printfn "calling openai embeddings API"
let sw = System.Diagnostics.Stopwatch ()
sw.Start ()
let embs : MyRedis.RedisEmbedding array = 
    docs 
    |> MyOpenAI.rebuild2EmbeddingsSlow             
    
    // here the part of the code to be defined and reviewed
    
sw.Stop ()
printfn "Artificially slowed down to %f" (sw.Elapsed.TotalSeconds)

该代码的目标是显示日志中任务数组的顺序执行的时间,并且可能还保证执行是顺序的而不是并行的,因为我认为,它会更简单。Task.WhenAll

虽然我不得不引入一个累加器,因为返回一个尚未执行的,并且我无法在其中插入计时部分。mutableTask.WhenAllTask

另外,类似的东西

    |> Array.fold (fun s t -> 
        printfn "slow down - intermediate elapsed start %f " (sw.Elapsed.TotalSeconds)
        let! e1 = t
        printfn "intermediate elapsed end %f " (sw.Elapsed.TotalSeconds)
        Array.concat [| e1; s |]
    ) 

不会编译,因为let!

但我想确保以下代码是 F# 惯用代码,并且关键字的使用是不可避免的。mutable

printfn "calling openai embeddings API"
let sw = System.Diagnostics.Stopwatch ()
sw.Start ()
let embs : MyRedis.RedisEmbedding array = 
    docs 
    |> MyOpenAI.rebuild2EmbeddingsSlow             
    |> Array.fold (fun s t -> 
        let mutable acc = [||]
        printfn "slow down - intermediate elapsed start %f " (sw.Elapsed.TotalSeconds)
        task {
            let! e1 = t
            acc <- Array.append e1 s
        } |> Task.WaitAll
        printfn "intermediate elapsed end %f " (sw.Elapsed.TotalSeconds)
        acc
        ) 
        [| |]
sw.Stop ()
printfn "Artificially slowed down to %f" (sw.Elapsed.TotalSeconds)

我认为绑定不能直接在函数内部使用,因为它在计算表达式之外,因此代码确实在函数中正确使用了变量 acc,并且该变量对于在数组中累积嵌入是必要的。let!Array.foldmutableArray.foldmutable

这是可以接受的吗?代码 F# 是惯用的吗?

f# 任务 可变

评论

1赞 Brian Berns 6/10/2023
这看起来很复杂,但我认为您主要是尝试按顺序运行一系列任务,对吧?请注意,任务在创建后立即开始运行,因此它们无法真正按顺序运行。您可以按顺序等待它们,但如果它们已经存在,则无法真正按顺序执行它们。
0赞 Giulio 6/10/2023
@BrianBerns好点,答案可能是我应该使用 F# 冷异步而不是 C# 热任务......
1赞 Brian Berns 6/10/2023
在这种情况下,您可以使用按顺序运行异步数组。Async.Sequential
0赞 Giulio 6/10/2023
这正是我现在正在尝试的!
0赞 Giulio 6/10/2023
@BrianBerns你可以把它写成一个答案,我已经尝试过了,似乎还可以(顺便说一句,我不明白为什么我的总时间几乎总是一样的,看起来在 OpenAI 方面,他们正在以某种“聪明”的方式对电话做出“反应”......你应该澄清我是否可以返回!openAIClient.GetEmbeddingsAsync (model, embeddingOptions) |> Async.AwaitTask 在 |> Array.map 中,后跟一个 Async.Parallel,它将生成每个大小为 N 的 Async<Response<Embeddings> 数组>批次,然后这些批次将进入 |> Async.Sequential

答:

0赞 Giulio 6/10/2023 #1

为了避免变量,我找到了以下解决方案mutable

printfn "calling openai embeddings API"
let sw = System.Diagnostics.Stopwatch ()
sw.Start ()
let embs_timings_task : Task<(MyRedis.RedisEmbedding array) array> = 
    docs 
    |> MyOpenAI.rebuild2EmbeddingsSlow             
    // be kind: fold the async workflows with sequential composition
    |> Array.fold (fun s t -> 
        let sequential = 
            task {
            let curr_start = sw.Elapsed.TotalSeconds
            printfn "intermediate elapsed start %f " curr_start
            let! acc = s 
            let acc = acc |> Array.last
            let! e = t
            let curr_end = sw.Elapsed.TotalSeconds
            printfn "intermediate elapsed end %f " curr_end
            return Array.append acc e
            } 
            |> Task.WhenAll
        // the following is needed to make all the steps sequential instead of parallel
        sequential |> Task.WaitAll
        sequential               ) 
        (Task.FromResult [|[| |] |])
let! embs_timings_arr = embs_timings_task
let embs = embs_timings_arr |> Array.last
sw.Stop ()
printfn "Artificially slowed down to %f" (sw.Elapsed.TotalSeconds)

其中最重要的是内部,这是使所有步骤按顺序而不是并行的关键sequential |> Task.WaitAll

其余的只是从包装器外部包装器的转换,它避免了对结果的需要,但意味着结果,因为计算是按顺序进行的,因此唯一相关。Task.WaitAllTask.WhenAllmutablearraylast

评论

0赞 Giulio 6/10/2023
但是,在这种情况下,使用可变变量 acc 在 Array.fold 函数中累积结果提供了一个清晰直接的解决方案。所以我认为不值得转换代码以避免可变的 acc,因为恕我直言,代码的可读性会降低,而且不一定更符合 F# 惯用语。
0赞 Giulio 6/11/2023
出于历史跟踪目的,我保留了此答案,但我已经从使用 C# 中的热任务切换到使用 F# 中的冷异步。请参考此评论 stackoverflow.com/questions/76445536/... e
1赞 Brian Berns 6/11/2023 #2

下面是从基础 .NET 任务创建 F# 异步数组,然后按顺序执行这些任务的示例。

首先,我编写了一个模拟 OpenAI API 调用的虚拟函数:

let openAiTask i =
    task {
        do! Task.Delay(1000)
        return! Task.FromResult i
    }

然后,我创建了一批对 API 的 F# 异步调用:

let asyncs =
    let sw = System.Diagnostics.Stopwatch()
    sw.Start()
    [| 1..10 |]
        |> Array.map (fun i ->
            async {
                printfn "%A" sw.Elapsed.TotalSeconds
                return! openAiTask i |> Async.AwaitTask
            })

现在,我可以随时按顺序执行异步调用:

let results =
    Async.Sequential asyncs
        |> Async.RunSynchronously
printfn "%A" results

请注意,在我显式运行它们之前,不会发生任何 API 调用。输出如下所示:

0.0238377
1.0503682
2.0602829
3.0719534
4.0832107
5.0838953
6.0951127
7.1090538
8.1239493
9.1347231
[|1; 2; 3; 4; 5; 6; 7; 8; 9; 10|]

评论

0赞 Giulio 6/12/2023
小说明:实际上我有一个调用数组数组,所以 - 与 python 不同,python 在 1 个 1 个请求嵌入时是完全顺序的 - 我的代码具有 n 个调用块的真正并行性 - 从速率限制中获益 - 然后我必须按顺序运行这些并行调用块。是的,当然,从你的答案的角度来看,没有任何变化,它保持不变,我知道。另外,我注意到 openAiTask - 即使它是 Task,而不是 Async- 也没有启动 - 也许很明显,但 imo 至关重要 - 直到我被给出或部分应用该函数。.apply(lambda: ...