结构化并发的替代方案
Java 结构化并发(Structured Concurrency)已经历经 5 年的发展,期间经历了 8 个不同的 JEP(JEP 428、JEP 437、JEP 453、JEP 462、JEP 480、JEP 499、JEP 505、JEP 525)。在我看来,对于一个原本可以算作相当简洁的特性来说,这显得过于冗长了。
我的目标是尝试一种替代方案,该方案充分利用了 Java 自 JDK 1.5 以来经过验证的、稳健的功能。这条路径有可能比 JEP 505 中提出的方案取得更好的效果,因为在我看来,JEP 505 引入了一整套冗余的接口和类,它们实际上只是在重复已有的功能。
毫无疑问,即使在像 Java 这样相对安全的开发环境中(拥有自动垃圾回收、内存管理和严格类型检查),开发者也需要一定的约束。无论提供多么安全的路径,开发者仍然会犯错,例如解引用 null、使用越界索引、吞掉异常等等。而且,并发无疑是最难正确对待的领域——它是 bug 的无穷来源。但首先,让我先介绍一些将贯穿本文使用的辅助代码。
根据 Oracle 的说法,大多数 Java 开发者倾向于以下列方式处理并发执行(摘自 JEP 505 的示例,已修改为使用上述辅助代码):
这里存在一系列严重问题,其中几个在 JEP 的“动机”部分有详细说明:
[LOADING...]
与上述示例相反,Oracle 提出使用其结构化并发 API 作为解决方案,理论上可以解决这些问题:
让我们把注意力转回最初的代码。在经过认真的 QA 工作、编写具有良好代码覆盖率的测试并完成彻底代码审查之后,开发者下一步通常会做什么?最有可能的是,他们会将初始代码块优化为下面更新的版本:
乍一看,这种方法似乎相当有效——在执行过程中出现错误时,任何剩余的 Feature 都会被取消,所有执行线程都被正确终止。然而,仍然存在大量样板代码,在一致地实现时仍显繁琐。没问题,我们可以将公共功能抽取到某个可复用的类中。请参阅 Gist 中的 TaskScope 类。通过这样做,代码发生了显著的变化:
在查看了 Gist 的源码后(为了理解,你绝对应该去看),你会发现一个重要的点:这个实现依赖的是 12 年前发布的 Java 1.8 版本。而且,如果没有使用 java.util.stream.Stream,它甚至可以在 JDK 1.5 上无缝运行!但等等——为什么这里要引入 java.util.stream.Stream?坦率地说,这正是提案的核心。上面的示例 D 高效地处理了一种场景,即等待所有任务完成,并在任何任务失败时抛出错误。支持不同的场景需要更复杂一些的东西。Gist 中分享的 TaskScope 实现将已完成的 Future 队列(无论完成是通过结果、错误还是取消)直接转换为 Stream。好奇这为什么有用?让我们再次重写这个无聊的示例:
这样,我们只需将所有完成的 Future 转换为结果列表,并祈祷没有错误发生。将所有成功完成的 Future 转换为结果列表,完全忽略可能的错误。作用域内不会抛出任何异常:
或者简单地找到第一个可用的结果:
或者,选择前 N 个结果:
在这两个最近的示例中,一旦 main 方法中的 try-with-resources 块退出,所有剩余的 future 将自动被终止。显然,我们也可以在收集结果时处理错误,并提前终止——如果代码逻辑不允许中间错误的话:
如果你已经熟悉 JEP 505,你会明白这里替换了什么:StructuredTaskScope.Joiner。现在,你可以模拟任何类型的“join”行为,而无需子类化/实现 StructuredTaskScope.Joiner。基于完成队列的 Stream 管道 API 是一种开箱即用的表达力极强的工具。而且,随着 Gatherers 的引入,对于真正临时性的场景——例如管理结果窗口(即处理固定大小批次的已准备好结果)——还有更多的发挥空间。
同样值得注意的是,在 JEP 505 中,某些 StructuredTaskScope.Joiner 的实现会生成 Stream 作为输出。然而,是 Joiner 决定了所有 fork 何时完成处理,并在 join 之后打开结果流。而本文描述的替代方法中,join 发生的位置和方式由用户定义的 scope 流逻辑决定。它是一种惰性的、按需的过程——由可能不仅考虑 Future 结果的条件来指导。例如,内部对象状态或作用域内变量等元素可以直接影响决定:收集哪些结果,以及操作中可以忽略哪些错误(如果有)。
现在来谈谈真正的挑战。上述代码的一个显著限制是它无法传播上下文,即当前的 ScopedValue 绑定。这一特性有时被认为是 JEP 505 StructuredTaskScope 的主要优势。公平地说,有人可能会认为这是一种不公平的优势,因为它之所以存在,完全是因为 JDK 内部机制使其成为可能。当前的绑定通过使用 jdk.internal.misc.ThreadFlock 来捕获和传播,这是一个 JDK 外部代码无法访问的工具。
或许,在更理想的情况下,存在一个 JDK 25,它提供了以下官方的 java.util.concurrent.ThreadFactory 接口,为弥合这一差距提供了可能性:
但对我们来说情况并非如此。幸运的是,java.util.concurrent 包中的类提供了极大的可定制性,是非常灵活的工具(对 Dr. Douglas S. Lea 致以崇高敬意)。所以,你可以在 同一个 Gist 中找到另一个类 TaskScopeContextual。这个类将 StructuredTaskScope 适配为 ExecutorService API,其唯一目的是为 fork 的任务传播 ScopedValue 绑定。下面的示例展示了使用这种替代结构化作用域设计的所有优势:
注意通过 Stream 优雅地处理超时。与 JEP 505 中的当前方法不同,无需将其合并到 API 中。
总结一下,概括如下:
- 不需要
StructuredTaskScope.Subtask——现有的java.util.concurrent.FutureAPI 已经足够胜任。 - 因此,引入
StructuredTaskScope.Subtask.State是多余的——即使使用当前的 JEP 505,Future.State已经足够。 StructuredTaskScope.Joiners要求为除最简单情况外的所有情况创建子类。基于已完成 Future 的java.util.stream.Stream管道会是一个更方便的解决方案。StructuredTaskScope.FailedException感觉没有必要——即使在当前 API 中,java.util.concurrent.CompletionException也能很好地完成相同的目的。- 内置的
StructuredTaskScope超时具有难以预测的时间特性(例如,尝试在第一个 fork 之前添加长时间阻塞调用)。显式处理超时要简单且可控得多。
我很想听听读者的意见。您是同意我的想法,还是支持 JDK 团队关于 Futures "在结构化并发中适得其反" 的说法(参见 JEP 505 的“备选方案”部分)?您认为广为人知且适应性强的 Stream API 比 Joiners 更优越,还是一组严格的 Joiners 更简单?
DZone 贡献者表达的观点仅代表其个人。