This is intended as quick guide to getting started with futures in Java. It includes some theory, some best practices, exercise material and known issues to keep in mind. Hopefully all of this can be useful to get a better understanding of how futures work in Java 8+.
Let us start with defining what a future is.
A future is a container for a value, similar to an AtomicReference<T>
.
It has two fundamental states - incomplete (a value has not been set yet) and complete (a specific value has been set).
The purpose of a future is to enable asynchronous programming. Instead of blocking a thread, waiting for some expensive or time-consuming operation to complete, you can finish immediately and defer the next step of the process until the future is complete.
You can think of futures from two sides - producers and consumers.
We usually spend most of the time using futures as consumers, and then we usually seem them as read-only containers that change state once. We can subscribe to be notified when the state changes and trigger further work.
From a producer side, we should instead use the term Promise. You create a promise that you will produce a value and some time later when the value is ready, you complete or fulfill the promise. The future associated with the promise will thus complete and the consumer can act on it.
You can think of it as the following flow:
- The producer creates a Promise
- The produces extracts the Future from the Promise and gives the Future to the consumer.
- The consumer starts registering callbacks and transformations on the future.
- At some point later the producer fulfills the Promise and the callbacks will trigger for the consumer.
(There is a good wikipedia article about this if you want to read more about it)
You can think of futures in Java as monads, similar to the ´Optional` class. It's a value container that you can apply transformations on to get a different container with a different value.
If you look at futures from the consumer side and focus on the transformation operations, you can see them as immutable. The futures themselves don't change, they just contain a value. From the consumer side, it doesn't matter if the futures are complete or not, they will eventually apply the transformations.
This also means that you shouldn't mix futures with mutable objects - that introduces risks of race condition related bugs!
Experiment: MutableTest
Java 1.5 introduced the Future
interface but it was a very limited - you could call get()
and isDone()
but there was no way to get notified of state changes. Instead you would need to do polling.
Google Guava introduced the ListenableFuture
interface which extends Future
but also adds
addListener(Runnable, executor)
- this one method enabled a more asynchronous development model.
All other useful methods could now be implemented as utility functions on top of the primitives.
The Google Guava class Futures
included some very useful methods:
addCallback(callback)
- convenience method on top ofaddListener
transform(function)
- return a new future after applying a function to the valuestransformAsync(function)
- like transform, but the function should return a future instead. (Similar to Java 8thenCompose
)
Then with Java 8 we got CompletableFuture
which had achieved feature parity with Google Guava futures,
though with differences in:
- Fluent API -
CompletableFuture
has a fluent API unlikeListenableFuture
(but Google later addedFluentFuture
too) - Mutability - Google futures separates the producer side from the consumer side, while they are much more tightly coupled
in
CompletableFuture
. - Number of primitives - Google futures have a small set of primitives (transform, transformAsync, catching) that
can be combined while
CompletableFuture
has a large set of methods for combinations of the underlying primitives. thenApply, thenRun and thenAccept could be reduced to a single promitive and likewise for handle, runAfter, runAsync, whenComplete. Almost all methods inCompletableFuture
also exist in three variants: regular, async with default executor and async with custom executor.
This guide will focus on Java8+ futures since it is now the defacto standard, but most of the best practices also apply to other types of futures.
Unfortunately, the Java 8 futures doesn't really comply with the strict definitions above.
- They are not immutable and write-once.
- Producers and consumers are not really distinct - consumers can also set the state.
That said, if you use them responsibly you can still treat them as immutable and write-once.
Java 8+ defines the class CompletableFuture
which implements CompletionStage
and this class can be
seen as a both a Promise and a Future. It has all the methods necessary to fulfill a promise and to inspect
the state of the future (is it complete yet?)
It also defines the interface CompletionStage
which is intended as a read-only view of future.
It contains all the methods needed to perform callbacks on a future and it does not have methods that can:
- set the state (
complete(value)
,obtrudeValue(value)
, etc.) - query the state (
get()
,join()
,isDone()
, etc.)
However, it does have the method toCompletableFuture()
which means that it is in practice trivial to get back as
a future form.
You could also have other implementations of CompletionStage
which makes it possible to hide the mutability,
but that is rarely used.
A future can be completed in two ways, successful and failed.
For a successful completion, the future will contain a specific value.
For a failed completion, the future will contain a specific Throwable
instance.
If the future fails by throwing inside a transform, or set via completeExceptionally
,
they will be wrapped in a CompletionException
.
If the future fails due to a cancellation, it will be mapped to a CancellationException
instead.
Experiment: ExceptionTest
CompletableFuture has the following static methods:
supplyAsync(Supplier<T>)
andsupplyAsync(Supplier<T>, Executor)
- Create a future from code that runs on a different threadrunAsync(Runnable, Executor)
andrunAsync(Runnable)
- Similar to supplyAsync, but returns a void futurecompletedFuture(T)
- Returns a future that's already completeallOf(CompletableFuture...)
- A future that completes when all futures completeanyOf(CompletableFuture...)
- Similar to allOf, but returns as soon as any of the futures complete
Methods for introspecting the state of a future:
isDone()
,isCancelled()
andisCompletedExceptionally()
- Self-explanatory?getNumberOfDependents()
- Estimated number of callbacks for future completion (For monitoring only!)
Methods for extracting the value of a future:
getNow(T default)
- Non-blocking - returns the default value if it is not complete.get()
- Blocking get - throws checked exceptionsget(long, TimeUnit)
- Blocking get, but with a timeoutjoin()
- Blocking get - throws unchecked exceptions
Methods for setting the state of the future
complete(Object)
,completeExceptionally(Throwable)
andcancel(boolean)
- Complete the future in various waysobtrudeValue(Object)
andobtrudeException(Throwable)
- Complete the future, mutating the value it even if it is already completed!
It has a bunch of methods that operate on a future to create a new (and improved!) future:
thenApply(Function)
- transform value -> valuethenApplyAsync(Function, Executor)
-thenApplyAsync(Function)
-thenAccept(Consumer)
-thenAcceptAsync(Consumer, Executor)
-thenAcceptAsync(Consumer)
-thenRun(Runnable)
-thenRunAsync(Runnable, Executor)
-thenRunAsync(Runnable)
-thenCombine(CompletionStage, BiFunction)
-thenCombineAsync(CompletionStage, BiFunction, Executor)
-thenCombineAsync(CompletionStage, BiFunction)
-thenAcceptBoth(CompletionStage, BiConsumer)
-thenAcceptBothAsync(CompletionStage, BiConsumer, Executor)
-thenAcceptBothAsync(CompletionStage, BiConsumer)
-runAfterBoth(CompletionStage, Runnable)
-runAfterBothAsync(CompletionStage, Runnable, Executor)
-runAfterBothAsync(CompletionStage, Runnable)
-applyToEither(CompletionStage, Function)
-applyToEitherAsync(CompletionStage, Function, Executor)
-applyToEitherAsync(CompletionStage, Function)
-acceptEither(CompletionStage, Consumer)
-acceptEitherAsync(CompletionStage, Consumer, Executor)
-acceptEitherAsync(CompletionStage, Consumer)
-runAfterEither(CompletionStage, Runnable)
-runAfterEitherAsync(CompletionStage, Runnable, Executor)
-runAfterEitherAsync(CompletionStage, Runnable)
-thenCompose(Function)
- transform value -> CompletionStage(value)thenComposeAsync(Function, Executor)
-thenComposeAsync(Function)
-whenComplete(BiConsumer)
-whenCompleteAsync(BiConsumer, Executor)
-whenCompleteAsync(BiConsumer)
-handle(BiFunction)
- transform either value or exceptionhandleAsync(BiFunction, Executor)
-handleAsync(BiFunction)
-exceptionally(Function)
- transform exception
There are a lot of methods here - most of them can be expressed in terms of handle
and thenCompose
and can be considered convenience methods and to better express intent.
Some of them can be considered callbacks instead of transforms, but they still return futures so you can take actions on the completion, even if you don't care about their values.
Most transform methods come in three forms. Let's use thenApply
as an example.
It has the three methods thenApply(function)
, thenApplyAsync(function)
and thenApplyAsync(function, executor)
.
thenApply(function)
will either execute the function on the same thread that completed the parent future or execute
on the same thread that added the callback to the parent future.
Which one it is depends on if the callback was added before or after the future was completed.
Note that it is not always trivial to determine which case it is!
thenApplyAsync(function)
is equivalent to
thenApplyAsync(function, defaultExecutor())
which translates to ForkJoinPool.commonPool()
in most cases.
(See javadocs for more information)
thenApplyAsync(function, executor)
will schedule the function to be executed on the specified executor.
This means that for a sequence of future.thenApplyAsync(function, executor).thenApply(otherFunction)
you can not know for sure which thread otherFunction
will be executed on.
Experiment: WhereDoesItRunTest
If you want to ensure that work is being done on a specific executor, you must use the *Async
method.
If you just want to move work away from the current executor, it's enough to use *Async
method for the first
step and then use the regular methods for further calls in the chain.
Note that this only works as long as the *Async
method is being invoked at all! If you're using thenApplyAsync
but the parent future has completed exceptionally, the step will be skipped, and thus the work won't move to the
specified executor.
To ensure that it always moves, I recommend using whenCompleteAsync(() -> {}, executor)
instead.
Experiment: MoveExecutorTest
Once a future has been completed, it should not be set again, but you can in fact do it.
You can use obtrudeValue
and obtrudeException
to overwrite the value of a future.
If the future was already complete when it was obtruded, dependent futures will not be affected.
It's unclear in what circumstances these methods are useful.
Experiment: ObtrudeTest
Cancelling a future is the same as completing it exceptionally with a CancellationException. The only difference is that the exact future that was cancelled will emit a CancellationException directly, while child futures will have that exception wrapped in a CompletionException.
Experiment: CancelTest
To simplify life working with Java 8 futures, there's a Spotify library called [completable-futures)(https://github.com/spotify/completable-futures).
It contains some useful utility methods, described in more detail below
Sometimes (through no fault of you own, I'm sure!) you may end up with something like:
CompletableFuture<CompletableFuture<T>> future
which may be annoying to work with.
Fortunately, it's not very difficult to convert that to CompletableFuture<T> future2
.
All you need to do is apply thenCompose(value -> value)
(composing with the identify function).
This has been wrapped as dereference
- naming was chosen to correspond to the equivalent function in Google Guava.
exceptionallyCompose()
was introduced to cover the usecase of handling a failure by doing more asynchronous work.
While the Java 8 API lets you handle a successful future and compose it (with thenCompose
) there is no such
equivalent for handling exception. This convenience method is very simple to implement.
Implementation: ExceptionallyCompose
It also includes some utility functions to combine multiple futures in a safe way, avoiding manual error-prone calls to get or join.
Writing unit tests with futures can be tricky due to the asynchronous nature of futures and the fact that tests should run quickly and deterministically.
When possible, it's usually a good idea to pass in already completed futures to the tests and then
when verifying the result, you should use something like CompletableFutures.getCompleted(future)
.
This ensures that the future was in fact complete and will trigger an exception if it was not.
If you instead call something like future.get()
or future.join()
you risk
blocking on an incomplete future - perhaps infinitely!
You also lose the benefit of being able to detect if the future is not immediately complete. If you pass in already completed futures to the code under test, the resulting future should also be immediately completed.
For more complex scenarios this is of course not always possible.
Due to the fluent style of the future API, it's easy to think of futures as a sequence of transformation, but we don't have to limit ourselves to that.
Multiple child futures can depend on the same parent future, and a child future can depend on multiple parent futures. This means we should think of it as a Directed Acyclic Graph (a DAG!) instead!
Each future is a node in the graph, and each dependency is a directed edge.
To create futures that depend on multiple other futures, you have multiple options:
- nest the dependencies by using
thenCompose
andthenApply
- use
thenCombine
to pairwise combine futures - use
CompletableFuture.allOf()
to combine a list of futures. - use a library such as CompletableFutures to combine up to 6 futures
Experiment: CombineTest
There are some usability problems with using allOf
, so I recommend avoiding it.
You get a CompletableFuture<Void>
back so it is only useful to determine when the dependencies are complete,
and get a callback when it is ready. Inside the transformation or callback you then have to call join on the input futures.
This works, but has several potentials for bugs:
- If you make a mistake, you may be joining on a future that is not complete - this will block in the thread!
- If you make a mistake, you may be joining on a future that will never complete - this will deadlock the thread!
- The mistake may not be obvious, and not happen on each execution, making it hard to detect and debug.
- If your dependencies change, you might forget to remove one of the calls to joins.
If you still want to use allOf()
, at least make sure to replace the calls to join (or get) with calls to
CompletableFutures.getCompleted()
which is guaranteed to never block. It will instead fail if the future is not completed.
Experiment: AllOfTest
With Java 9 we got some additions to the API.
New static methods:
delayedExecutor(long, TimeUnit)
anddelayedExecutor(long, TimeUnit, Executor)
- Get an executor that delays the workcompletedStage(Object)
,failedFuture(Throwable)
andfailedStage(Throwable)
- Complements for thecompletedFuture(Object)
method
New instance methods:
newIncompleteFuture()
- Virtual constructor, intended for subclassingdefaultExecutor()
- Get the default executor that is used for default*Async
transforms.copy()
- Convenience method forthenApply(x -> x)
minimalCompletionStage()
- Returns a more restricted futurecompleteAsync(Supplier, Executor)
andcompleteAsync(Supplier)
- Complete the future using a suppliercompleteOnTimeout(Object, long, TimeUnit)
- Complete the future with a value after a timeoutorTimeout(long, TimeUnit)
- Complete the future with an exception after a timeout
orTimeout
andcompleteOnTimeout
does not let you configure an executor, so you have to manually add something like:.whenCompleteAsync(()->{}, executor)
Otherwise you may be bottlenecking on a static single-threaded ScheduledExecutorService
.
Experiment: OnTimeoutTest
workarounds for timeout-issue Explain minimalCompletionStage and delayedExecutor