parallelStream().forEach 给出不一致的输出

parallelStream().forEach giving inconsistent output

提问人:Arushi awasthi 提问时间:11/8/2023 最后编辑:HolgerArushi awasthi 更新时间:11/10/2023 访问量:103

问:

我有如下代码,

List<String> itemIdList = new ArrayList<>();
List<String> linkedItemIds = new ArrayList<>();
items.parallelStream().forEach(item -> {
    itemIdList.add(item.getId());
    validateLinkedItemStatus(order, error);
    if (CollectionUtils.isNotEmpty(error)) {
        //some db validation
        return;
    }
    linkedItemIds.add(item.getLinkedId());
});

由于项目数量众多,我想并行执行此任务,但每次执行此循环时,相同项目的 itemIdList 大小都不同。有人能告诉我这里出了什么问题吗?

java foreach 并行处理 java-stream

评论

4赞 tgdavies 11/8/2023
ArrayList.add不是线程安全的。
2赞 Holger 11/8/2023
为了从 Stream API 中获得优势,您没有办法了解它的各种操作,包括收集到 的惯用方式,而不是像旧循环一样使用它。使用 ,您会遇到问题中的线程安全问题,或者如果您试图同时解决线程安全问题甚至两种问题,则会遇到性能问题。ListforEachforEach

答:

1赞 Chaosfire 11/8/2023 #1

该问题是由于不正确地使用副作用引起的。Stream API 文档明确不鼓励此类使用:

通常,不鼓励对流操作的行为参数产生副作用,因为它们通常会导致在不知不觉中违反无状态要求,以及其他线程安全隐患。 如果行为参数确实有副作用,除非明确说明,否则无法保证这些副作用对其他线程的可见性,也无法保证在同一线程中对同一流管道中的“同一”元素执行不同的操作。

由于您需要并且可选,您可以做的是在收集之前对 ID 使用包装器对象。idlinkedId

public record IdWrapper(String id, String linkedId) {

  public IdWrapper(String id) {
    this(id, null);
  }
}

并在对象中有条件地添加链接的 id。

List<IdWrapper> wrappers = items.parallelStream()
            .map(item -> {
              validateLinkedItemStatus(order, error);
              return CollectionUtils.isNotEmpty(error) ? new IdWrapper(item.id()) : new IdWrapper(item.id(), item.linkedId());
            })
            .toList();//or collect(Collectors.toList()) depending on the exact scenario

如果你真的想/需要使用当前的方法,为了得到正确的结果,你需要使你正在改变的列表同步:forEach()

List<String> itemIdList = Collections.synchronizedList(new ArrayList<>());
List<String> linkedItemIds = Collections.synchronizedList(new ArrayList<>());

但是,这将破坏您试图通过并行性实现的性能优势。

如果并行执行,ArrayList 的非线程安全性将导致不正确的结果,并且添加所需的同步将导致争用,从而破坏并行性的好处。

正如 Holger 在评论中指出的那样,另一个可能的问题是不能保证尊重流的遭遇顺序。forEach

评论

2赞 Holger 11/9/2023
使用同步列表不仅会导致性能问题。结果元素顺序可能与源列表的原始相遇顺序不匹配。解决方法是使用而不是使用会进一步损害性能......forEachforEachOrderedforEach
2赞 Holger 11/9/2023
另一件应该提到的是:令人怀疑的是,无论是条件还是条件实际上都不依赖于。这可能表明存在其他隐藏状态,这些隐藏状态不太可能是线程安全的。如果不是这种情况,则不应对每个 .validateLinkedItemStatus(order, error);CollectionUtils.isNotEmpty(error)itemitem
0赞 Dawid Witaszek 11/8/2023 #2

尝试同步列表:itemIdList 和 linkedItemIds。这看起来像是一个种族问题。您可以使用 Collections.synchronizedList(list) 轻松同步列表

线程安全数据结构的良好示例在以下主题中(在 Java 中选择最佳并发列表)

评论

2赞 Chaosfire 11/8/2023
这将起作用,但请注意,外部同步会损害并行流的性能。