Ohhnews

分类导航

$ cd ..
DZone Java原文

结构化并发的替代方案

#java#结构化并发#jep 505#taskscope#stream api

Java 结构化并发(Structured Concurrency)已经历经 5 年的发展,期间经历了 8 个不同的 JEP(JEP 428、JEP 437、JEP 453、JEP 462、JEP 480、JEP 499、JEP 505、JEP 525)。在我看来,对于一个原本可以算作相当简洁的特性来说,这显得过于冗长了。

我的目标是尝试一种替代方案,该方案充分利用了 Java 自 JDK 1.5 以来经过验证的、稳健的功能。这条路径有可能比 JEP 505 中提出的方案取得更好的效果,因为在我看来,JEP 505 引入了一整套冗余的接口和类,它们实际上只是在重复已有的功能。

毫无疑问,即使在像 Java 这样相对安全的开发环境中(拥有自动垃圾回收、内存管理和严格类型检查),开发者也需要一定的约束。无论提供多么安全的路径,开发者仍然会犯错,例如解引用 null、使用越界索引、吞掉异常等等。而且,并发无疑是最难正确对待的领域——它是 bug 的无穷来源。但首先,让我先介绍一些将贯穿本文使用的辅助代码。

$ java
// Example Proto
package net.tascalate.concurrentx;
// imports here
public class FuturesDemo {
    static final ScopedValue<String> DEMO_SV = ScopedValue.newInstance();

    // This emulates long-running calls
    // we need to execute asynchronously --
    // all we do is returning value after the delay
    // or throw a supplied exception to emulate error
    private static <T> Callable<T> produceValue(T value, long delay) {
        return () -> {
            var start = System.currentTimeMillis();
            try {
                System.out.println(">> Waiting value: " + value + " (SCOPED VALIUE IS " + DEMO_SV.orElse(" ") + ")");
                Thread.sleep(delay);
                System.out.println(">> Producing value: " + value);
                if (value instanceof Exception) {
                    throw (Exception)value;
                } else {
                    return value;
                }
            } finally {
                var finish = System.currentTimeMillis();
                System.out.println(">> Exiting " + value + ", " + Thread.currentThread() + ", done in " + (finish - start) + "ms, vs " + delay + "ms specified");
            }
        };
    }

    public static void main(String[] argv) {
        // implementation will be here
    }
}

根据 Oracle 的说法,大多数 Java 开发者倾向于以下列方式处理并发执行(摘自 JEP 505 的示例,已修改为使用上述辅助代码):

$ java
// Example A - "unstructured concurrency"
public static void main(String[] argv) throws InterruptedException, ExecutionException {
    var executor = Executors.newVirtualThreadPerTaskExecutor();
    var start = System.currentTimeMillis();
    try {
        Future<Object> a = executor.submit( produceValue("A", 1000));
        Future<Object> b = executor.submit( produceValue(LocalDateTime.now(), 1500));
        Future<Object> c = executor.submit( produceValue(BigInteger.valueOf(42), 500));
        var result = List.of(a.get(), b.get(), c.get());
        System.out.println("*** ALL result: " + result);
    } finally {
        var finish = System.currentTimeMillis();
        System.out.println(
            "*** Exiting main, executed in " + (finish - start) + "ms");
        executor.shutdownNow();
    }
}

这里存在一系列严重问题,其中几个在 JEP 的“动机”部分有详细说明:

[LOADING...]

与上述示例相反,Oracle 提出使用其结构化并发 API 作为解决方案,理论上可以解决这些问题:

$ java
// Example B -- structured concurrency
@SuppressWarnings("preview")
public static void main(String[] argv) throws InterruptedException, ExecutionException {
    var start = System.currentTimeMillis();
    try (var scope = StructuredTaskScope.open(
        StructuredTaskScope.Joiner.allSuccessfulOrThrow())) {
        var a = scope.fork(produceValue("A", 1000));
        var b = scope.fork(produceValue(LocalDateTime.now(), 1500));
        var c = scope.fork(produceValue(BigInteger.valueOf(42), 500));
        scope.join();
        var result = List.of(a.get(), b.get(), c.get());
        System.out.println("*** ALL result: " + result);
    } catch (StructuredTaskScope.FailedException ex) {
        System.out.println("*** ALL exception: " + ex.getCause());
    } finally {
        var finish = System.currentTimeMillis();
        System.out.println(
            "*** Exiting main, executed in " + (finish - start) + "ms");
    }
}

让我们把注意力转回最初的代码。在经过认真的 QA 工作、编写具有良好代码覆盖率的测试并完成彻底代码审查之后,开发者下一步通常会做什么?最有可能的是,他们会将初始代码块优化为下面更新的版本:

$ java
// Example C - fixed "unstructured concurrency" from Example A
public static void main(String[] argv) throws InterruptedException, ExecutionException {
    Future<Object> a = null;
    Future<Object> b = null;
    Future<Object> c = null;
    var executor = Executors.newVirtualThreadPerTaskExecutor();
    var start = System.currentTimeMillis();
    try {
        a = executor.submit(produceValue("A", 1000));
        b = executor.submit(produceValue(LocalDateTime.now(), 1500));
        c = executor.submit(produceValue(BigInteger.valueOf(42), 500));
        var result = List.of(a.get(), b.get(), c.get());
        System.out.println("ALL result: " + result);
    } finally {
        var finish = System.currentTimeMillis();
        Stream.of(a, b, c)
            .filter(Objects::nonNull)
            .forEach(f -> f.cancel(true));
        System.out.println(
            "*** Exiting main, executed in " + (finish - start) + "ms");
        executor.shutdownNow();
    }
}

乍一看,这种方法似乎相当有效——在执行过程中出现错误时,任何剩余的 Feature 都会被取消,所有执行线程都被正确终止。然而,仍然存在大量样板代码,在一致地实现时仍显繁琐。没问题,我们可以将公共功能抽取到某个可复用的类中。请参阅 Gist 中的 TaskScope 类。通过这样做,代码发生了显著的变化:

$ java
// Example D - fixed "unstructured concurrency" from Example A
// with a reusable TaskScope class
public static void main(String[] argv) throws InterruptedException, ExecutionException {
    var start = System.currentTimeMillis();
    try (var scope = new TaskScope(
        Executors.newVirtualThreadPerTaskExecutor())) {
        var a = scope.fork(produceValue("A", 1000));
        var b = scope.fork(produceValue(LocalDateTime.now(), 1500));
        var c = scope.fork(produceValue(BigInteger.valueOf(42), 500));
        var result = List.of(a.get(), b.get(), c.get());
        System.out.println("*** ALL result: " + result);
    } finally {
        var finish = System.currentTimeMillis();
        System.out.println(
            "*** Exiting main, executed in " + (finish - start) + "ms");
    }
}

在查看了 Gist 的源码后(为了理解,你绝对应该去看),你会发现一个重要的点:这个实现依赖的是 12 年前发布的 Java 1.8 版本。而且,如果没有使用 java.util.stream.Stream,它甚至可以在 JDK 1.5 上无缝运行!但等等——为什么这里要引入 java.util.stream.Stream?坦率地说,这正是提案的核心。上面的示例 D 高效地处理了一种场景,即等待所有任务完成,并在任何任务失败时抛出错误。支持不同的场景需要更复杂一些的东西。Gist 中分享的 TaskScope 实现将已完成的 Future 队列(无论完成是通过结果、错误还是取消)直接转换为 Stream。好奇这为什么有用?让我们再次重写这个无聊的示例:

$ java
// Example E - same as Example D but with Stream pipeline
public static void main(String[] argv) {
    var start = System.currentTimeMillis();
    try (var scope = new TaskScope(
        Executors.newVirtualThreadPerTaskExecutor())) {
        scope.fork(produceValue("A", 1000));
        scope.fork(produceValue(LocalDateTime.now(), 1500));
        scope.fork(produceValue(BigInteger.valueOf(42), 500));
        var result = scope.completions()
            .map(Future::resultNow)
            .toList();
        System.out.println("*** ALL result: " + result);
    } finally {
        var finish = System.currentTimeMillis();
        System.out.println(
            "*** Exiting main, executed in " + (finish - start) + "ms");
    }
}

这样,我们只需将所有完成的 Future 转换为结果列表,并祈祷没有错误发生。将所有成功完成的 Future 转换为结果列表,完全忽略可能的错误。作用域内不会抛出任何异常:

$ java
var result = scope.completions()
    .filter(f -> f.state() == Future.State.SUCCESS)
    .map(Future::resultNow)
    .toList();

或者简单地找到第一个可用的结果:

$ java
var result = scope.completions()
    .filter(f -> f.state() == Future.State.SUCCESS)
    .map(Future::resultNow)
    .findAny()
    .orElse("");

或者,选择前 N 个结果:

$ java
var N = 5;
var result = scope.completions()
    .filter(f -> f.state() == Future.State.SUCCESS)
    .map(Future::resultNow)
    .limit(N)
    .toList();

在这两个最近的示例中,一旦 main 方法中的 try-with-resources 块退出,所有剩余的 future 将自动被终止。显然,我们也可以在收集结果时处理错误,并提前终止——如果代码逻辑不允许中间错误的话:

$ java
var result = scope.completions()
    .peek(f -> {
        if (f.state() == Future.State.FAILED)
            throw new CompletionException(f.exceptionNow());
    })
    .map(Future::resultNow)
    .limit(2)
    .toList();

如果你已经熟悉 JEP 505,你会明白这里替换了什么:StructuredTaskScope.Joiner。现在,你可以模拟任何类型的“join”行为,而无需子类化/实现 StructuredTaskScope.Joiner。基于完成队列的 Stream 管道 API 是一种开箱即用的表达力极强的工具。而且,随着 Gatherers 的引入,对于真正临时性的场景——例如管理结果窗口(即处理固定大小批次的已准备好结果)——还有更多的发挥空间。

同样值得注意的是,在 JEP 505 中,某些 StructuredTaskScope.Joiner 的实现会生成 Stream 作为输出。然而,是 Joiner 决定了所有 fork 何时完成处理,并在 join 之后打开结果流。而本文描述的替代方法中,join 发生的位置和方式由用户定义的 scope 流逻辑决定。它是一种惰性的、按需的过程——由可能不仅考虑 Future 结果的条件来指导。例如,内部对象状态或作用域内变量等元素可以直接影响决定:收集哪些结果,以及操作中可以忽略哪些错误(如果有)。

现在来谈谈真正的挑战。上述代码的一个显著限制是它无法传播上下文,即当前的 ScopedValue 绑定。这一特性有时被认为是 JEP 505 StructuredTaskScope 的主要优势。公平地说,有人可能会认为这是一种不公平的优势,因为它之所以存在,完全是因为 JDK 内部机制使其成为可能。当前的绑定通过使用 jdk.internal.misc.ThreadFlock 来捕获和传播,这是一个 JDK 外部代码无法访问的工具。

或许,在更理想的情况下,存在一个 JDK 25,它提供了以下官方的 java.util.concurrent.ThreadFactory 接口,为弥合这一差距提供了可能性:

$ java
public interface ThreadFactory {
    abstract Thread newThread(Runnable code);
    default ThreadFactory captureContext() {
        ThreadFactory delegate = this;
        Object currentScopedValueBindings = SomeInternalClass.captureValueBindingsForTheCurrentThread();
        return new ThreadFactory() {
            public Thread newThread(Runnable code) {
                Thread result = delegate.newThread(code);
                SomeInternalClass.applyValueBindings(result);
                return result;
            }
        };
    }
}

但对我们来说情况并非如此。幸运的是,java.util.concurrent 包中的类提供了极大的可定制性,是非常灵活的工具(对 Dr. Douglas S. Lea 致以崇高敬意)。所以,你可以在 同一个 Gist 中找到另一个类 TaskScopeContextual。这个类将 StructuredTaskScope 适配为 ExecutorService API,其唯一目的是为 fork 的任务传播 ScopedValue 绑定。下面的示例展示了使用这种替代结构化作用域设计的所有优势:

$ java
// Example F - true structured concurrency with context passing
public static void main(String[] argv) {
    var start = System.currentTimeMillis();
    ScopedValue.where(DEMO_SV, "VALUE_DEFINED_IN_MAIN").call(() -> {
        try (var scope = new TaskScopeContextual()) {
            scope.fork(produceValue("A", 1000));
            scope.fork(produceValue("B", 2000));
            scope.fork(produceValue("C", 2000));
            scope.fork(produceValue("D", 2000));
            var timeout = scope.fork(produceValue(null, 2500));
            scope.fork(produceValue("E", 2000));
            scope.fork(produceValue("F", 3000));
            scope.fork(produceValue("G", 3000));
            var result = scope.completions()
                .takeWhile(f -> f != timeout)
                .filter(f -> f.state() == Future.State.SUCCESS)
                .limit(6)
                .map(Future::resultNow)
                .sorted()
                .toList();
            System.out.println("*** ALL result: " + result);
        } finally {
            var finish = System.currentTimeMillis();
            System.out.println(
                "*** Exiting main, executed in " + (finish - start) + "ms");
        }
        return null;
    });
}

注意通过 Stream 优雅地处理超时。与 JEP 505 中的当前方法不同,无需将其合并到 API 中。

总结一下,概括如下:

  1. 不需要 StructuredTaskScope.Subtask——现有的 java.util.concurrent.Future API 已经足够胜任。
  2. 因此,引入 StructuredTaskScope.Subtask.State 是多余的——即使使用当前的 JEP 505,Future.State 已经足够。
  3. StructuredTaskScope.Joiners 要求为除最简单情况外的所有情况创建子类。基于已完成 Future 的 java.util.stream.Stream 管道会是一个更方便的解决方案。
  4. StructuredTaskScope.FailedException 感觉没有必要——即使在当前 API 中,java.util.concurrent.CompletionException 也能很好地完成相同的目的。
  5. 内置的 StructuredTaskScope 超时具有难以预测的时间特性(例如,尝试在第一个 fork 之前添加长时间阻塞调用)。显式处理超时要简单且可控得多。

我很想听听读者的意见。您是同意我的想法,还是支持 JDK 团队关于 Futures "在结构化并发中适得其反" 的说法(参见 JEP 505 的“备选方案”部分)?您认为广为人知且适应性强的 Stream API 比 Joiners 更优越,还是一组严格的 Joiners 更简单?

DZone 贡献者表达的观点仅代表其个人。