Futures, cancellation and coroutines

Roman Elizarov
4 min readOct 13, 2018

--

Programming languages often use futures / promises to model asynchronous computations. It is quite an old invention that does not require any support from the language itself, but allows to support asynchrony as a pure library solution. Using a future type, an asynchronous function is declared like this:

fun doSomethingAsync(): CompletableFuture<SomeData>

You might think that this asynchronous function can be used like this:

val futureOne = doSomethingAsync()
val two = doSomethingElse() // while doSomethingAsync is working
futureOne.whenComplete { /* process success or failure */ }

This code is simple and understandable, yet wrong. If this code fails for any reason in doSomethingElse, then two bad things happen: first of all, doSomethingAsync continues to work in background (it leaks), consuming resources, and if it so happens that doSomethingAsync also fails, then its failure is not going to be handled in any way, since futureOne references is lost.

Future combinators

This problem is usually worked around by the following convention. Whenever an asynchronous function like doSomethingAsync is called, its result is immediately passed to some combinator function that is responsible for the proper propagation of failures. There are predefined combinators for all kinds of patterns. For example, one way to correctly implement the above pattern is:

doSomethingAsync().thenAcceptBoth(CompletableFuture.supplyAsync {
doSomethingElse()
}) { one, two ->
// combine results of both calls here
}.whenComplete { /* process success or failure */ }

Yuk! But it does the trick of making sure that a failure in either one or the other gets processed correctly. Moreover, this way we can build higher-level abstractions from lower level ones and introduce new async functions. You even get a variant of a Stockholm syndrome here — it starts to look nicer if you write all your functions in async style:

fun doBothAsync(): CompletableFuture<SomeOtherResult> =
doSomethingAsync()
.thenAcceptBoth(doSomethingElseAsync()) { one, two ->
// combine results of both calls here
}

Combinators play well with the concept of cancellation. All combinators, by convention, chain in such a way, that when the resulting future is cancelled, then all the source futures are cancelled, too. This effectively makes CompletableFuture (and similar types) one-shot futures by convention. While you can technically use a single CompletableFuture in many different places and attach multiple independent combinators to it, you don’t do (and should not do) it in practice, since cancellation of just one consumer would irrevocably cancel the source future.

The downside of this async style programming with combinators is the fact that you are forced to write all your asynchronous code in a different way from your synchronous code, aka colored function problem. Even simple things, like sequential composition (doing something first and then doing something else with the result), becomes an exercise in combinator applications:

fun doSequentiallyAsync(): CompletableFuture<SomeOtherResult> =
doSomethingAsync().
thenCompose { something ->
doSomethingElseAsync(something)
}

Kotlin coroutines

You are lucky if there is a combinator for the problem you have at hand (and if you can find it). Kotlin coroutines were designed to help, so that you can write this kind of code in direct style, given that you declare all asynchronous (long-running) functions as suspending functions.

suspend fun doSomething(): SomeData

Sequential composition becomes straightforward with Kotlin coroutines:

suspend fun doSequentially() {
val something = doSomething()
doSomethingElse(something)
}

Parallel composition with Kotlin coroutines is not that much harder, either, but requires you to be explicit about parallelism and about the scope of your parallel code:

suspend fun doBoth() = coroutineScope {
val
deferredOne = async { doSomething() }
val
two = doSomethingElse() // while doSomething is working
val one = deferredOne.await()
// combine results of both calls here
}

The code above above looks like the simple version of the code with futures that we’ve started with but, unlike that code with futures, it is not only simple, but it is also correct with respect to failures and cancellation, due to the way coroutineScope cancels all children coroutines on failure.

Integration with futures

In our reality we often have to integrate our Kotlin code with JVM libraries that represent asynchrony via CompletableFuture or via a similar future type. We use await() extension function on CompletableFuture type for that:

val something = doSomethingAsync().await()

In order to properly integrate with cancellation, CompletableFuture.await() uses the same convention as all future combinators do — it cancels the underlying future if the await call itself is cancelled. It also means that await() should be used just like the other future combinators. It shall be immediately applied to the result of async function in order to avoid the risk of leaking the underlying asynchronous operation on failure.

This behavior was one of the most recent and important changes to the kotlinx.coroutines before its upcoming 1.0.0 release to complete structured concurrency story and to provide reliable cancellation and exception handling on the integration boundary.

Deferred

Kotlin coroutines library has its own future type called Deferred. Instances of Deferred in Kotlin are usually created by async { ... } coroutine builder and serve two primary use-cases. The first use-case of parallel decomposition we’ve already seen. Cancellation in this use-case is handled via coroutineScope { ... }. You don’t have to immediately apply combinators to deferred. Scoped Deferred is fail-safe.

The second use-case for Deferred is to represent multi-use asynchronous caches. In this case, a GlobalScope or some other long-running scope is used to cache computed value for a long time:

val value = GlobalScope.async { doSomething() }

Fail-safe property of scoped deferred together with asynchronous cache use-case dictates that cancellation ofDeferred.await() does not cancel the underlying coroutine on cancellation.

Reactive futures

There is a neat trick employed in Reactive libraries (like Project Reactor and Rx). They define their future types (like Mono and Single) as cold futures, which do not run any code until asked for result. An instance of a cold future does not leak computation nor looses exceptions, because cold doSomethingAsync would not actually do anything until a terminal operation is invoked on its result. So, that is another way to get fail-safety, but you still have to write all your code with combinators just like with CompletableFuture.

--

--

Roman Elizarov
Roman Elizarov

Written by Roman Elizarov

Project Lead for the Kotlin Programming Language @JetBrains

Responses (3)