Scalaz 中的异步迭代处理

Asynchronous iteratee processing in Scalaz

提问人:Aaron Novstrup 提问时间:9/28/2013 最后编辑:CommunityAaron Novstrup 更新时间:10/6/2013 访问量:510

问:

我一直在使用 Scalaz 7 迭代来处理恒定堆空间中的大型(即无限)数据流。

在代码中,它看起来像这样:

type ErrorOrT[M[+_], A] = EitherT[M, Throwable, A]
type ErrorOr[A] = ErrorOrT[IO, A]

def processChunk(c: Chunk): Result

def process(data: EnumeratorT[Chunk, ErrorOr]): IterateeT[Chunk, ErrorOr, List[Result]] =
  Iteratee.fold[Chunk, ErrorOr, List[Result]](Nil) { (rs, c) =>
    processChunk(c) :: rs
  } &= data

现在,我想并行执行处理,一次处理 P 个数据块。我仍然需要限制堆空间,但可以合理地假设有足够的堆来存储 P 个数据块和计算的累积结果。

我知道这个类,并想到了在枚举器上映射以创建任务流:Task

data map (c => Task.delay(processChunk(c)))

但我仍然不确定如何管理非确定性。在使用流时,如何确保 P 任务尽可能运行?

第一次尝试:

我对解决方案的第一个尝试是折叠流并创建一个 Scala 来处理每个块。然而,该程序因 GC 开销错误而爆炸(可能是因为它在尝试创建所有 s 时将所有块拉入内存)。相反,迭代者需要在已经有 P 个任务运行时停止消耗输入,并在其中任何一个任务完成时再次恢复。FutureFuture

第二次尝试:

我的下一个尝试是将流分组为 P 大小的部分,并行处理每个部分,然后在继续下一部分之前加入:

def process(data: EnumeratorT[Chunk, ErrorOr]): IterateeT[Chunk, ErrorOr, Vector[Result]] =
  Iteratee.foldM[Vector[Chunk], ErrorOr, Vector[Result]](Nil) { (rs, cs) =>
    tryIO(IO(rs ++ Await.result(
      Future.traverse(cs) { 
        c => Future(processChunk(c)) 
      }, 
      Duration.Inf)))
  } &= (data mapE Iteratee.group(P))

虽然这不会充分利用可用的处理器(特别是因为处理每个处理器所需的时间可能会有很大差异),但这将是一个改进。然而,枚举者似乎泄漏了内存 - 堆使用率突然飙升Chunk

异步 Scalaz 循环

评论

0赞 om-nom-nom 10/2/2013
仅供参考,play 迭代(基于 scala futures)也存在内存泄漏。这种情况最近已得到解决
0赞 Aaron Novstrup 10/3/2013
@om-nom-nom 内存泄漏似乎与 Scala Futures 的使用无关。请参阅此相关问题
0赞 om-nom-nom 10/3/2013
实际上想对这个问题发表评论:-)只是说 scalaz 期货和 scala 期货在概念上可能有同样的缺陷

答: 暂无答案