https://www.baeldung.com/java-completablefuture
2. java 异步计算
异步计算很难推理。通常我们希望将任何计算视为一系列步骤。但是在异步计算的情况下,表示为回调的动作往往分散在代码中或者深深地嵌套在彼此内部。当我们需要处理其中一个步骤中可能发生的错误时,情况变得更糟。
Future 5接口在Java 5中添加,作为异步计算的结果,但它没有任何方法来组合这些计算或处理可能的错误。 在Java 8中,引入了CompletableFuture类。与Future接口一起,它还实现了CompletionStage接口。此接口定义了可与其他步骤组合的异步计算步骤的契约。
CompletableFuture同时是一个构建块和一个框架,具有大约50种不同的组合,组合,执行异步计算步骤和处理错误的方法。 如此庞大的API可能会令人难以招架,但这些API大多属于几个明确且不同的用例。
3. Using CompletableFuture as a Simple Future
首先,CompletableFuture类实现Future接口,因此您可以将其用作Future实现,但具有额外的完成逻辑(completion logic)。
您可以使用无参(no-arg)构造函数创建此类的实例以表示未来的某些结果,将其交给使用者并在将来的某个时间使用complete方法完成。(该类 实例的)使用者可以使用get方法来阻止当前线程,直到提供此结果。
例子: 计算完成后,该方法通过将结果提供给complete方法来完成Future
public
Future<String> calculateAsync() throws
InterruptedException {
CompletableFuture<String> completableFuture
= new
CompletableFuture<>();
Executors.newCachedThreadPool().submit(() -> {
Thread.sleep(500);
completableFuture.complete("Hello");
return
null;
});
return
completableFuture;
}
我们使用 Executor API (“Introduction to Thread Pools in Java”, )- 这种创建和完成CompletableFuture的方法可以与任何并发机制或API(包括 raw threads)一起使用。
注意这里返回的是一个 Future instance。
我们调用方法,接收Future实例并在我们准备阻塞结果时调用它的get方法。
Future<String> completableFuture = calculateAsync();
// ...
String result = completableFuture.get();
assertEquals("Hello", result);
另请注意,get方法会抛出一些 checked exceptions 即ExecutionException(封装计算期间发生的异常)和InterruptedException(表示执行方法的线程被中断的异常)
如果您已经知道计算的结果,则可以将static completedFuture方法与表示此计算结果的参数一起使用。那么get方法不会被阻塞而是立即返回。
Future<String> completableFuture =
CompletableFuture.completedFuture("Hello");
// ...
String result = completableFuture.get();
assertEquals("Hello", result);
作为替代方案,您可能希望取消Future的执行。
假设我们没有设法找到结果并决定完全取消异步执行。这可以使用Future的cancel方法完成。此方法接收布尔参数mayInterruptIfRunning,但在CompletableFuture的情况下,它没有任何效果,因为中断不用于控制CompletableFuture的处理。
异步方法的修改版本:
public
Future<String> calculateAsyncWithCancellation() throws
InterruptedException {
CompletableFuture<String> completableFuture = new
CompletableFuture<>();
Executors.newCachedThreadPool().submit(() -> {
Thread.sleep(500);
completableFuture.cancel(false);
return
null;
});
return
completableFuture;
}
当我们使用Future.get()方法阻塞结果时,如果 future 取消,它将抛出CancellationException
Future<String> future = calculateAsyncWithCancellation();
future.get(); // CancellationException
4.具有封装计算逻辑的CompletableFuture
上面的代码允许我们选择任何并发执行机制,但是如果我们想要跳过这个样板并简单地异步执行一些代码呢?
CompletableFuture<String> future
= CompletableFuture.supplyAsync(() -> "Hello");
// ...
assertEquals("Hello", future.get());
5.处理异步计算的结果
处理计算结果的最通用方法是将其提供给函数。 thenApply方法:接受一个Function实例,用它来处理结果并返回一个包含函数返回值的Future:
CompletableFuture<String> completableFuture
= CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<String> future = completableFuture
.thenApply(s -> s + " World");
assertEquals("Hello World", future.get());
如果您不需要在Future链中返回值,则可以使用Consumer功能接口的实例。它的单个方法接受一个参数并返回void。 在CompletableFuture中有一个用于此用例的方法 - thenAccept方法接收Consumer并将计算结果传递给它。最后的future.get()调用返回Void类型的实例
CompletableFuture<String> completableFuture
= CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<Void> future = completableFuture
.thenAccept(s -> System.out.println("Computation returned: "
+ s));
future.get();
最后,如果您既不需要计算的值也不想在链的末尾返回一些值,那么您可以将Runnable lambda传递给thenRun方法。在下面的示例中,在调用future.get()方法之后,我们只需在控制台中打印一行:
CompletableFuture<String> completableFuture
= CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<Void> future = completableFuture
.thenRun(() -> System.out.println("Computation finished."));
future.get();
CompletableFuture API的最牛的是能够在一系列计算步骤中组合CompletableFuture实例
这种链接的结果本身就是CompletableFuture,允许进一步链接和组合。这种方法在函数式语言中无处不在(ubiquitous),通常被称为一元(monadic)设计模式。
在下面的示例中,我们使用thenCompose方法按顺序链接两个Futures
请注意,此方法采用返回CompletableFuture实例的函数。该函数的参数是先前计算步骤的结果。这允许我们在下一个CompletableFuture的lambda中使用这个值
CompletableFuture<String> completableFuture
= CompletableFuture.supplyAsync(() -> "Hello")
.thenCompose(s -> CompletableFuture.supplyAsync(() -> s + " World"));
assertEquals("Hello World", completableFuture.get());
thenCompose方法与thenApply一起实现一元(monadic)模式的基本构建块。它们与Java 8中可用的Stream和Optional类的map和flatMap方法密切相关。
两个方法都接收一个函数并将其应用于计算结果,但thenCompose(flatMap)方法接收一个函数,该函数返回相同类型的另一个对象。此功能结构允许将这些类的实例组合为构建块。
如果要执行两个独立的Futures并对其结果执行某些操作,请使用接受Future的thenCombine方法和具有两个参数的Function来处理两个结果:
CompletableFuture<String> completableFuture
= CompletableFuture.supplyAsync(() -> "Hello")
.thenCombine(CompletableFuture.supplyAsync(
() -> " World"), (s1, s2) -> s1 + s2);
assertEquals("Hello World", completableFuture.get());
更简单的情况是,当您想要使用两个future结果时,但不需要将任何结果值传递给Future链。 thenAcceptBoth方法可以提供帮助
CompletableFuture future = CompletableFuture.supplyAsync(() -> "Hello")
.thenAcceptBoth(CompletableFuture.supplyAsync(() -> " World"),
(s1, s2) -> System.out.println(s1 + s2));
7. Difference Between thenApply() and thenCompose()
thenApply()
此方法用于处理先前调用的结果。但是,要记住的一个关键点是返回类型将合并所有调用。因此,当我们想要转换CompletableFuture调用的结果时,此方法很有用:
CompletableFuture<Integer> finalResult = compute().thenApply(s-> s + 1);
thenCompose()
thenCompose()方法类似于thenApply(),它们都返回一个新的Completion Stage。但是,thenCompose()使用前一个阶段作为参数。它会直接 flatten 并返回Future,而不是我们在thenApply()中观察到的嵌套 future:
CompletableFuture<Integer> computeAnother(Integer i){
return
CompletableFuture.supplyAsync(() -> 10
+ i);
}
CompletableFuture<Integer> finalResult = compute().thenCompose(this::computeAnother);
因此,如果想要链接CompletableFuture方法,那么最好使用thenCompose()。
另外,这两种方法之间的差异类似于map()和flatMap()之间的差异
allof()
当我们需要并行执行多个Futures时,我们通常希望等待所有它们执行,然后处理它们的组合结果。
CompletableFuture.allOf 静态方法允许等待 所有的Futures完成 作为var-arg:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
CompletableFuture<String> future1
= CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<String> future2
= CompletableFuture.supplyAsync(() -> "Beautiful");
CompletableFuture<String> future3
= CompletableFuture.supplyAsync(() -> "World");
CompletableFuture<Void> combinedFuture
= CompletableFuture.allOf(future1, future2, future3);
// ...
combinedFuture.get();
assertTrue(future1.isDone());
assertTrue(future2.isDone());
assertTrue(future3.isDone());
请注意,CompletableFuture.allOf()的返回类型是CompletableFuture <Void>。
这种方法的局限性在于它不会返回所有Futures的组合结果。相反,您必须手动从Futures获取结果。
幸运的是, CompletableFuture.join() 方法及 Java 8 Streams API 使之变得简单:
1
2
3
4
5
String combined = Stream.of(future1, future2, future3)
.map(CompletableFuture::join)
.collect(Collectors.joining(" "));
assertEquals("Hello Beautiful World", combined);
CompletableFuture.join() 类似于 get 方法,但是如果Future没有正常完成,它会抛出 unchecked exception。这使得它可以在Stream.map()方法中用作方法引用。
9. Handling Errors
对于异步计算步骤链中的错误处理,必须以类似的方式适用throw / catch惯用语法。 CompletableFuture类允许您在特殊的 handle 方法中处理它,而不是在语法块中捕获异常。此方法接收两个参数:计算结果(如果成功完成)和抛出异常(如果某些计算步骤未正常完成)。
在下面的示例中,异步计算因出错完成 (由于 name == null),我们使用handle方法提供一个默认值:
1
2
3
4
5
6
7
8
9
10
11
12
13
String name = null;
// ...
CompletableFuture<String> completableFuture
= CompletableFuture.supplyAsync(() -> {
if
(name == null) {
throw
new
RuntimeException("Computation error!");
}
return
"Hello, "
+ name;
})}).handle((s, t) -> s != null
? s : "Hello, Stranger!");
assertEquals("Hello, Stranger!", completableFuture.get());
作为替代方案,假设我们想要使用值手动完成Future,如第一个示例中所示,但也可以使用异常来完成它。 completeExceptionally 方法适用于此。以下示例中的completableFuture.get() 方法抛出 ExecutionException,其RuntimeException为其原因:
1
2
3
4
5
6
7
8
9
10
CompletableFuture<String> completableFuture = new
CompletableFuture<>();
// ...
completableFuture.completeExceptionally(
new
RuntimeException("Calculation failed!"));
// ...
completableFuture.get(); // ExecutionException
在上面的示例中,我们可以使用handle方法异步处理异常,但是使用get方法,我们可以使用更典型的同步异常处理方法。
多数CompletableFuture类中的API的方法都有两个带有Async后缀的附加变体。这些方法通常用于在另一个线程中运行相应的执行步骤。
这是一个使用Function instance处理计算结果的修改示例。唯一可见的区别是thenApplyAsync方法。但在幕后,函数的应用程序被包装到 ForkJoinTask 实例中(有关fork / join框架的更多信息,请参阅文章“Guide to the Fork/Join Framework in Java”)。这样可以进一步并行化您的计算并更有效地使用系统资源。
1
2
3
4
5
6
7
CompletableFuture<String> completableFuture
= CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<String> future = completableFuture
.thenApplyAsync(s -> s + " World");
assertEquals("Hello World", future.get());
New instance APIs have been introduced:
We also now have a few static utility methods:
Finally, to address timeout, Java 9 has introduced two more new functions:
Here’s the detailed article for further reading: Java 9 CompletableFuture API Improvements.
In this article we’ve described the methods and typical use cases of the CompletableFuture class.
The source code for the article is available over on GitHub.