F# AsyncSeq - 如何按顺序返回值的 mke asyncSeq 迭代?

F# AsyncSeq- How do a mke asyncSeq Iterate in the order values are returned?

提问人:Eli Dowling 提问时间:7/7/2020 最后编辑:Eli Dowling 更新时间:7/9/2020 访问量:404

问:

我在使用 AsyncSeq 时遇到了一些问题,该 AsyncSeq 并行运行一些任务,然后迭代这些并行任务的结果,一次只执行一个任务。我认为这对于AsyncSeq来说应该是完美的,但是,因为它以序列的初始顺序进行迭代,因此它不会在任务进入时执行任务。 很难解释。看这个小模型可能很容易:

    //completes after the given time
    let randomwait time=
        async{
            printfn "started waiting : %i"time
            do! Async.Sleep(time*1000);
            printfn "waited %i" time
            return time
        }
    //Creates 10 tasks in decending order of time taken to complete: 10s,9s 8s, etc
    let stream=
        asyncSeq{
            for i=10 downto 1 do
                let waitTime= i
                yield randomwait waitTime
        }
    let run =
            
        let task=
            stream 
               |> AsyncSeq.mapAsyncParallel id // This runs all our randomWait tasks at once
               |>AsyncSeq. 1(fun time ->async{ printfn "printing for time : %i" time})
        Async.RunSynchronously task

我希望代码输出以下内容,每次打印之间有一秒钟的延迟。

Printing for Time: 1
Printing for Time: 2
etc etc

但是,由于迭代顺序不是由上一个并行任务中的完成顺序设置的,因此结果是向后的,并且在第一个 10 秒任务完成后立即打印所有结果。

Printing for Time: 10
Printing for Time: 9
etc etc

任何帮助将不胜感激。如有必要,我很乐意使用其他解决方案,任何允许并行的解决方案,然后一次迭代一个。

异步 F# 序列

评论


答:

2赞 Tomas Petricek 7/7/2020 #1

代码中最重要的关键操作是 。这将循环访问输入异步序列,启动所有任务,然后按照启动任务的顺序生成结果。mapAsyncParallel

该操作不会等待所有任务完成,但只有在它产生所有 N-1 个早期任务的结果后,它才会产生第 N 个任务的结果。

以下示例流比示例更好地演示了该行为:

let stream=
    asyncSeq { 
        for waitTime = 5 downto 1 do
            yield randomwait waitTime
        for waitTime = 10 to 15 do
            yield randomwait waitTime
    }

如果以此为例,您的代码将等待 5 秒,然后它将打印“打印时间”5、4、3、2、1(因为它必须等待 5 秒,直到第一个任务完成,同时,剩余的 4 个完成),但随后它将再等待 5 秒并打印“打印时间”6, 等待 1 秒,打印 7,等待 1 秒,打印 8 等。

如果仅替换为 ,则代码将按顺序运行任务,并(按顺序)等待每个任务完成。这样一来,您就不会同时发生任何事情,等待时间也会更长。mapAsyncParallemapAsync

要做(我认为)你想做的事情,最好的选择是从 using 切换到 using .异步序列是连续的,并保留元素的顺序。Observable 不这样做。使用 FSharp.Control.Reactive 库,您可以执行以下操作:AsyncSeq<T>Observable<T>

let task=
    stream 
    |> AsyncSeq.toObservable
    |> Observable.bind Observable.ofAsync 
    |> Observable.iter (fun time -> printfn "printing for time : %i" time)

Observable.wait task |> ignore

在这里,该操作采用一个可观察对象,对于每个生成的值,它启动一个新的可观察对象(在我们的例子中,它只生成一个结果),但随后它按照它们到达的顺序收集所有结果,因此您首先获得结果,即使它是作为第五个元素开始的。bind1

评论

0赞 Eli Dowling 7/7/2020
谢谢,我明天会测试这个。我曾尝试过可观察对象,但我真的很难弄清楚它们应该如何与 asycseq 互操作。这以及可观察模块中存在多少疯狂的看似神秘的函数。
0赞 Tomas Petricek 7/7/2020
我能理解这一点!我真的很喜欢异步序列,因为它们在逻辑上非常简单(顺序但异步)。可观察对象是基于推送的(非顺序的),这使得整个逻辑更加棘手。但在这种情况下,它完全符合您的需求(我认为)。Observable.bind
0赞 Tomas Petricek 7/7/2020
从 AsyncSeq 转换为 Observable 很容易,但反过来就很棘手了(因为 Observable 可能会产生比 AsyncSeq 的使用者想要处理的更多的东西 - 因此您要么丢失数据,要么需要缓冲区)。
0赞 Eli Dowling 7/9/2020 #2

非常感谢托马斯·佩特里切克。 这是我的最终结果:

let observables= schedulesGrouped|>List.map(fun (schedules,groupName)->
            printfn "Setting up observables for group: %s" groupName
            schedules
                |>AsyncSeq.toObservable
                |>Observable.bind Observable.ofAsync
                |>Observable.iter(fun transferTask ->
                    Async.Start( processTask groupName transferTask))

            )
        let outPut=observables|>Observable.mergeSeq
        outPut|>Observable.wait

对你最初的想法进行了一些调整,因为我想运行一个在迭代中部分异步的任务,但它本质上仍然相同。

非常感谢您的帮助。