Callbacks and Kotlin Flows

Roman Elizarov
5 min readJul 21, 2019

--

Asynchrony, in computer programming, refers to the occurrence of events independent of the main program flow and ways to deal with such events, … without the program blocking to wait for them (from Wikipedia)

Photo by Antoine Barrès on Unsplash

In programming languages without built-in support for asynchrony there are two patterns that are used to implement it: callbacks and futures (aka promises). In fact, callbacks are the basic primitives and futures in asynchronous programming are backed by callbacks.

For example, Java 5 Future type does not support waiting for its completion without blocking. You can only use a blocking get method to wait for it. However, Java 8 has an extended CompletableFuture type with whenComplete method that installs a callback to wait for completion without blocking, thus making it usable for asynchronous programming.

Kotlin Coroutines support asynchrony by allowing suspension of a coroutine without blocking¹. Coroutines integrate with asynchronous libraries on JVM via callbacks.

Single-shot callback

Let us consider a hypothetical Operation interface that has a method to perform an operation asynchronously and takes a callback parameter to report its completion with either a resulting value or an error:

interface Operation<T> {
fun performAsync(callback: (T?, Throwable?) -> Unit)
}

We can define a suspending extension function² in Kotlin to perform an operation without blocking by using suspendCoroutine from the standard library³:

suspend fun <T> Operation<T>.perform(): T =
suspendCoroutine { continuation ->
performAsync { value, exception ->
when
{
exception != null -> // operation had failed
continuation.resumeWithException(exception)
else -> // succeeded, there is a value
continuation.resume(value as T)
}
}
}

Notice, that this perform is a cold source of values. It does not do anything until it is called and does not do anything after it has returned since it waits for the operation’s completion via a callback.

Cancellable operation

It is a good engineering practice to provide some means of cancellation in an asynchronous API. For example, Operation might have a cancel method for this purpose:

interface Operation<T> {
fun performAsync(callback: (T?, Throwable?) -> Unit)
fun cancel() // cancels ongoing operation
}

We can now implement perform as a cancellable suspending function using suspendCancellableCoroutine from kotlinx.coroutines library:

suspend fun <T> Operation<T>.perform(): T =
suspendCancellableCoroutine { continuation ->
performAsync { /* ... as before ... */ }
continuation.invokeOnCancellation { cancel() }
}

Multi-shot callback

But what if Operation delivers an asynchronous stream of values and calls the specified callback multiple times? It has to indicate its completion somehow, too. Let’s assume for this simple example that it does so by calling a callback with null value.

We cannot use such Operation with suspendCoroutine-like functions lest we get IllegalStateException when we try to resume continuation the second time, because Kotlin suspension and continuations are single-shot.

Kotlin Flow to the rescue! Flow is explicitly designed to represent a cold asynchronous stream of multiple values. We can use callbackFlow function to convert a multi-shot callback into a flow:

fun <T : Any> Operation<T>.perform(): Flow<T> =
callbackFlow {
performAsync { value, exception ->
when
{
exception != null -> // operation had failed
close(exception)
value == null -> // operation had succeeded
close()
else -> // there is a value
offer(value as T)
}
}
awaitClose { cancel() }
}

Notice a number of important differences. First of all, perform is no longer a suspending function. It does not wait for anything by itself. It returns a cold Flow. The code inside callbackFlow { ... } block is not called until this flow is collected by a caller of a terminal operation.

As before, performAsync installs a callback, but now instead of a Continuation we are working with a hot SendChannel that is open to deliver values. So, the offer function is called for each value and theclose function is called to signal a failure or a successful completion. Here awaitClose replaces invokeOnCancellation and also serves an important function of suspending the block inside callbackFlow while the values are incoming.

Backpressure

What happens if performAsync delivers values to a callback faster than collecting coroutine can process them? Enter the question of backpressure that always arises when dealing with asynchronous data streams. There is a buffer to keep a few values, but when this buffer overflows offers returns false and values are lost. There are several approaches to avoid or control the loss.

One is to replace offer(value) with sendBlocking(value). In this case the thread calling the callback gets blocked on buffer overflow until there is more room in the buffer. It is a typical way to signal backpressue in most of legacy streaming callback-based APIs and it guarantees that no value is ever lost.

If the number of expected values is limited or they should not be arriving too fast then we can use buffer operator to configure the unlimited buffer size by adding .buffer(Channel.UNLIMITED) call after callbackFlow { ... }. In this case offer always returns true, so no value will ever be lost and there is no blocking. However, there is a potential to exhaust memory with the buffered values.

Conflation

Often, the stream of values represents some partial result of operation or its status update so that only the most recent value is truly interesting. It means that values can be safely conflated using conflate operator on the resulting flow, which guarantees that offer always returns true and that the collector sees the most recent value, even though intermediate values can be dropped (conflated).

Reactive streams

If the original source of an asynchronous stream is represented as a reactive stream that is compliant with the reactive streams specification, then a built-in Publisher.asFlow extension function from kotlinx-coroutines-reactive module shall be used to convert a reactive streams Publisher type to a Kotlin Flow. No need to reinvent the wheel.

Further reading and footnotes

  1. ^ Blocking threads, suspending coroutines explains practical difference between blocking and suspension.
  2. ^ Extension-oriented design highlights design philosophy behind extension functions.
  3. ^ Coroutines design document has more details on callbacks and suspension.
  4. ^ Cold flows, hot channels defines the concept of a cold data source.
  5. ^ Simple design of Kotlin Flow gives the basics of flows.
  6. ^ Kotlin Flows and coroutines shows the conceptual implementation of buffer operator.
  7. ^ Reactive Streams and Kotlin Flows emphasises the similarities between reactive streams and Kotlin flows.

--

--

Roman Elizarov
Roman Elizarov

Written by Roman Elizarov

Project Lead for the Kotlin Programming Language @JetBrains

Responses (12)