https://dzone.com/articles/20-examples-of-using-javas-completablefuture
by Mahmoud Anouti · Feb. 02, 18 · Java Zone · Tutorial
Join the DZone community and get the full member experience. JOIN FOR FREE
This post revisits Java 8’s CompletionStage
API and specifically its implementation in the standard Java library CompletableFuture
. The API is explained by examples that illustrate the various behaviors, where each example focuses on one or two specific behaviors.
Since the CompletableFuture
class implements the CompletionStage
interface, we first need to understand the contract of that interface. It represents a stage of a certain computation which can be done either synchronously or asynchronously. You can think of it as just a single unit of a pipeline of computations that ultimately generates a final result of interest. This means that several CompletionStage
s can be chained together so that one stage’s completion triggers the execution of another stage, which in turn triggers another, and so on.
In addition to implementing the CompletionStage
interface, CompletableFuture
also implements Future
, which represents a pending asynchronous event, with the ability to explicitly complete this Future, hence the name CompletableFuture.
The simplest example creates an already completed CompletableFuture
with a predefined result. Usually, this may act as the starting stage in your computation.
static void completedFutureExample() {
CompletableFuture cf = CompletableFuture.completedFuture("message");
assertTrue(cf.isDone());
assertEquals("message", cf.getNow(null));
The getNow(null)
returns the result if completed (which is obviously the case), but otherwise returns null (the argument).
}
The next example is how to create a stage that executes a Runnable
asynchronously:
static void runAsyncExample() {
CompletableFuture cf = CompletableFuture.runAsync(() -> {
assertTrue(Thread.currentThread().isDaemon());
randomSleep();
});
assertFalse(cf.isDone());
The takeaway of this example is two things:
sleepEnough();
Async
assertTrue(cf.isDone());
Executor
is specified), asynchronous execution uses the common ForkJoinPool
implementation, which uses daemon threads to execute the Runnable
task. Note that this is specific to CompletableFuture
. Other CompletionStage
implementations can override the default behavior.}
The below example takes the completed CompletableFuture
from example #1, which bears the result string "message"
and applies a function that converts it to uppercase:
static void thenApplyExample() {
CompletableFuture cf = CompletableFuture.completedFuture("message").thenApply(s -> {
assertFalse(Thread.currentThread().isDaemon());
return s.toUpperCase();
});
Note the behavioral keywords in thenApply
:
assertEquals("MESSAGE", cf.getNow(null));
}
Function
on the result of the previous stage.The execution of the Function
will be blocking, which means that getNow() will only be reached when the uppercase operation is done.
By appending the Async
suffix to the method in the previous example, the chained CompletableFuture
would execute asynchronously (using ForkJoinPool.commonPool()
).
static void thenApplyAsyncExample() {
CompletableFuture cf = CompletableFuture.completedFuture("message").thenApplyAsync(s -> {
assertTrue(Thread.currentThread().isDaemon());
randomSleep();
return s.toUpperCase();
});
assertNull(cf.getNow(null));
assertEquals("MESSAGE", cf.join());
}
A very useful feature of asynchronous methods is the ability to provide an Executor
to use it to execute the desired CompletableFuture
. This example shows how to use a fixed thread pool to apply the uppercase conversion Function
:
static ExecutorService executor = Executors.newFixedThreadPool(3, new ThreadFactory() {
int count = 1;
@Override
public Thread newThread(Runnable runnable) {
return new Thread(runnable, "custom-executor-" + count++);
}
});
static void thenApplyAsyncWithExecutorExample() {
CompletableFuture cf = CompletableFuture.completedFuture("message").thenApplyAsync(s -> {
assertTrue(Thread.currentThread().getName().startsWith("custom-executor-"));
assertFalse(Thread.currentThread().isDaemon());
randomSleep();
return s.toUpperCase();
}, executor);
If the next stage accepts the result of the current stage but does not need to return a value in the computation (i.e. its return type is void), then instead of applying a Function
, it can accept a Consumer
, hence the method thenAccept
:
assertNull(cf.getNow(null));
assertEquals("MESSAGE", cf.join());
static void thenAcceptExample() {
}
StringBuilder result = new StringBuilder();
CompletableFuture.completedFuture("thenAccept message")
.thenAccept(s -> result.append(s));
assertTrue("Result was empty", result.length() > 0);
The Consumer
will be executed synchronously, so we don’t need to join on the returned CompletableFuture
.
}
Again, using the async version of thenAccept
, the chained CompletableFuture
would execute asynchronously:
static void thenAcceptAsyncExample() {
StringBuilder result = new StringBuilder();
CompletableFuture cf = CompletableFuture.completedFuture("thenAcceptAsync message")
.thenAcceptAsync(s -> result.append(s));
cf.join();
assertTrue("Result was empty", result.length() > 0);
}
Now let us see how an asynchronous operation can be explicitly completed exceptionally, indicating a failure in the computation. For simplicity, the operation takes a string and converts it to uppercase, and we simulate a delay in the operation of 1 second. To do that, we will use the thenApplyAsync(Function, Executor)
method, where the first argument is the uppercase function, and the executor is a delayed executor that waits for 1 second before actually submitting the operation to the common ForkJoinPool
.
static void completeExceptionallyExample() {
CompletableFuture cf = CompletableFuture.completedFuture("message").thenApplyAsync(String::toUpperCase,
CompletableFuture.delayedExecutor(1, TimeUnit.SECONDS));
CompletableFuture exceptionHandler = cf.handle((s, th) -> { return (th != null) ? "message upon cancel" : ""; });
cf.completeExceptionally(new RuntimeException("completed exceptionally"));
assertTrue("Was not completed exceptionally", cf.isCompletedExceptionally());
try {
cf.join();
fail("Should have thrown an exception");
Let’s examine this example in detail:
} catch(CompletionException ex) { // just for testing
CompletableFuture
that is already completed with the value "message"
. Next, we call thenApplyAsync
, which returns a new CompletableFuture
. This method applies an uppercase conversion in an asynchronous fashion upon completion of the first stage (which is already complete, thus the Function
will be immediately executed). This example also illustrates a way to delay the asynchronous task using the delayedExecutor(timeout, timeUnit)
method.assertEquals("completed exceptionally", ex.getCause().getMessage());
}
assertEquals("message upon cancel", exceptionHandler.join());
}
exceptionHandler
, that handles any exception by returning another message "message upon cancel"
.join()
method on the stage, which is doing the uppercase operation, throw a CompletionException
(normally join()
would have waited for 1 second to get the uppercase string). It will also trigger the handler stage.Very close to exceptional completion, we can cancel a computation via the cancel(boolean mayInterruptIfRunning)
method from the Future
interface. For CompletableFuture
, the boolean parameter is not used because the implementation does not employ interrupts to do the cancelation. Instead, cancel()
is equivalent to completeExceptionally(new CancellationException())
.
static void cancelExample() {
CompletableFuture cf = CompletableFuture.completedFuture("message").thenApplyAsync(String::toUpperCase,
CompletableFuture.delayedExecutor(1, TimeUnit.SECONDS));
CompletableFuture cf2 = cf.exceptionally(throwable -> "canceled message");
assertTrue("Was not canceled", cf.cancel(true));
assertTrue("Was not completed exceptionally", cf.isCompletedExceptionally());
assertEquals("canceled message", cf2.join());
}
The below example creates a CompletableFuture
that applies a Function
to the result of either of two previous stages (no guarantees on which one will be passed to the Function
). The two stages in question are: one that applies an uppercase conversion to the original string and another that applies a lowercase conversion:
static void applyToEitherExample() {
String original = "Message";
CompletableFuture cf1 = CompletableFuture.completedFuture(original)
.thenApplyAsync(s -> delayedUpperCase(s));
CompletableFuture cf2 = cf1.applyToEither(
CompletableFuture.completedFuture(original).thenApplyAsync(s -> delayedLowerCase(s)),
s -> s + " from applyToEither");
assertTrue(cf2.join().endsWith(" from applyToEither"));
Similar to the previous example, but using a Consumer
instead of a Function
(the dependent CompletableFuture
has a type void):
}
static void acceptEitherExample() {
String original = "Message";
StringBuilder result = new StringBuilder();
CompletableFuture cf = CompletableFuture.completedFuture(original)
.thenApplyAsync(s -> delayedUpperCase(s))
.acceptEither(CompletableFuture.completedFuture(original).thenApplyAsync(s -> delayedLowerCase(s)),
s -> result.append(s).append("acceptEither"));
cf.join();
assertTrue("Result was empty", result.toString().endsWith("acceptEither"));
This example shows how the dependent CompletableFuture
that executes a Runnable
triggers upon completion of both of two stages. Note that all the stages below run synchronously, where a stage first converts a message string to uppercase, then a second converts the same message string to lowercase.
}
static void runAfterBothExample() {
String original = "Message";
StringBuilder result = new StringBuilder();
CompletableFuture.completedFuture(original).thenApply(String::toUpperCase).runAfterBoth(
CompletableFuture.completedFuture(original).thenApply(String::toLowerCase),
() -> result.append("done"));
assertTrue("Result was empty", result.length() > 0);
}
Instead of executing a Runnable
upon completion of both stages, using BiConsumer
allows processing of their results if needed:
static void thenAcceptBothExample() {
String original = "Message";
StringBuilder result = new StringBuilder();
CompletableFuture.completedFuture(original).thenApply(String::toUpperCase).thenAcceptBoth(
CompletableFuture.completedFuture(original).thenApply(String::toLowerCase),
(s1, s2) -> result.append(s1 + s2));
assertEquals("MESSAGEmessage", result.toString());
}
If the dependent CompletableFuture
is intended to combine the results of two previous CompletableFuture
s by applying a function on them and returning a result, we can use the method thenCombine()
. The entire pipeline is synchronous, so getNow()
at the end would retrieve the final result, which is the concatenation of the uppercase and the lowercase outcomes.
static void thenCombineExample() {
String original = "Message";
CompletableFuture cf = CompletableFuture.completedFuture(original).thenApply(s -> delayedUpperCase(s))
.thenCombine(CompletableFuture.completedFuture(original).thenApply(s -> delayedLowerCase(s)),
(s1, s2) -> s1 + s2);
assertEquals("MESSAGEmessage", cf.getNow(null));
}
Similar to the previous example, but with a different behavior: since the two stages upon which CompletableFuture
depends both run asynchronously, the thenCombine()
method executes asynchronously, even though it lacks the Async
suffix. This is documented in the class Javadocs: “Actions supplied for dependent completions of non-async methods may be performed by the thread that completes the current CompletableFuture, or by any other caller of a completion method.” Therefore, we need to join()
on the combining CompletableFuture
to wait for the result.
static void thenCombineAsyncExample() {
String original = "Message";
CompletableFuture cf = CompletableFuture.completedFuture(original)
.thenApplyAsync(s -> delayedUpperCase(s))
.thenCombine(CompletableFuture.completedFuture(original).thenApplyAsync(s -> delayedLowerCase(s)),
(s1, s2) -> s1 + s2);
assertEquals("MESSAGEmessage", cf.join());
}
We can use composition using thenCompose()
to accomplish the same computation done in the previous two examples. This method waits for the first stage (which applies an uppercase conversion) to complete. Its result is passed to the specified Function
, which returns a CompletableFuture
, whose result will be the result of the returned CompletableFuture
. In this case, the Function takes the uppercase string (upper
), and returns a CompletableFuture
that converts the original
string to lowercase and then appends it to upper
.
static void thenComposeExample() {
String original = "Message";
CompletableFuture cf = CompletableFuture.completedFuture(original).thenApply(s -> delayedUpperCase(s))
.thenCompose(upper -> CompletableFuture.completedFuture(original).thenApply(s -> delayedLowerCase(s))
.thenApply(s -> upper + s));
assertEquals("MESSAGEmessage", cf.join());
}
The below example illustrates how to create a CompletableFuture
that completes when any of several CompletableFuture
s completes, with the same result. Several stages are first created, each converting a string from a list to uppercase. Because all of these CompletableFuture
s are executing synchronously (using thenApply()
), the CompletableFuture
returned from anyOf()
would execute immediately, since by the time it is invoked, all stages are completed. We then use the whenComplete(BiConsumer<? super Object, ? super Throwable> action)
, which processes the result (asserting that the result is uppercase).
static void anyOfExample() {
StringBuilder result = new StringBuilder();
List messages = Arrays.asList("a", "b", "c");
List<CompletableFuture> futures = messages.stream()
.map(msg -> CompletableFuture.completedFuture(msg).thenApply(s -> delayedUpperCase(s)))
.collect(Collectors.toList());
CompletableFuture.anyOf(futures.toArray(new CompletableFuture[futures.size()])).whenComplete((res, th) -> {
if(th == null) {
assertTrue(isUpperCase((String) res));
result.append(res);
}
The next two examples illustrate how to create a CompletableFuture
that completes when all of several CompletableFuture
s completes, in a synchronous and then asynchronous fashion, respectively. The scenario is the same as the previous example: a list of strings is provided where each element is converted to uppercase.
});
assertTrue("Result was empty", result.length() > 0);
}
static void allOfExample() {
StringBuilder result = new StringBuilder();
List messages = Arrays.asList("a", "b", "c");
List<CompletableFuture> futures = messages.stream()
.map(msg -> CompletableFuture.completedFuture(msg).thenApply(s -> delayedUpperCase(s)))
.collect(Collectors.toList());
CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])).whenComplete((v, th) -> {
futures.forEach(cf -> assertTrue(isUpperCase(cf.getNow(null))));
result.append("done");
});
By switching to thenApplyAsync()
in the individual CompletableFuture
s, the stage returned by allOf()
gets executed by one of the common pool threads that completed the stages. So we need to call join()
on it to wait for its completion.
assertTrue("Result was empty", result.length() > 0);
}
static void allOfAsyncExample() {
StringBuilder result = new StringBuilder();
List messages = Arrays.asList("a", "b", "c");
List<CompletableFuture> futures = messages.stream()
.map(msg -> CompletableFuture.completedFuture(msg).thenApplyAsync(s -> delayedUpperCase(s)))
.collect(Collectors.toList());
CompletableFuture allOf = CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]))
.whenComplete((v, th) -> {
futures.forEach(cf -> assertTrue(isUpperCase(cf.getNow(null))));
result.append("done");
});
Now that the functionality of CompletionStage
and specifically CompletableFuture
is explored, the below example applies them in a practical scenario:
allOf.join();
Car
objects asynchronously by calling the cars()
method, which returns a CompletionStage<List>
. The cars()
method could be consuming a remote REST endpoint behind the scenes.assertTrue("Result was empty", result.length() > 0);
}
CompletionStage<List>
that takes care of filling the rating of each car, by calling the rating(manufacturerId)
method which returns a CompletionStage
that asynchronously fetches the car rating (again could be consuming a REST endpoint).Car
objects are filled with their rating, we end up with a List<CompletionStage>
, so we call allOf()
to get a final stage (stored in variable done
) that completes upon completion of all these stages.whenComplete()
on the final stage, we print the Car
objects with their rating.cars().thenCompose(cars -> {
List<CompletionStage> updatedCars = cars.stream()
.map(car -> rating(car.manufacturerId).thenApply(r -> {
car.setRating(r);
return car;
})).collect(Collectors.toList());
CompletableFuture done = CompletableFuture
.allOf(updatedCars.toArray(new CompletableFuture[updatedCars.size()]));
return done.thenApply(v -> updatedCars.stream().map(CompletionStage::toCompletableFuture)
.map(CompletableFuture::join).collect(Collectors.toList()));
}).whenComplete((cars, th) -> {
Since the Car
instances are all independent, getting each rating asynchronously improves performance. Furthermore, waiting for all car ratings to be filled is done using a more natural allOf()
method, as opposed to manual thread waiting (e.g. using Thread#join()
or a CountDownLatch
).
if (th == null) {
cars.forEach(System.out::println);
Working through these examples helps better understand this API. You can find the full code of these examples on GitHub.
} else {
throw new RuntimeException(th);
}
}).toCompletableFuture().join();