Callbacks and Kotlin Flows

Asynchrony, in computer programming, refers to the occurrence of events independent of the main program flow and ways to deal with such events, … without the program blocking to wait for them (from Wikipedia)

Photo by Antoine Barrès on Unsplash

In programming languages without built-in support for asynchrony there are two patterns that are used to implement it: callbacks and futures (aka promises). In fact, callbacks are the basic primitives and futures in asynchronous programming are backed by callbacks.

For example, Java 5 type does not support waiting for its completion without blocking. You can only use a blocking method to wait for it. However, Java 8 has an extended type with method that installs a callback to wait for completion without blocking, thus making it usable for asynchronous programming.

Kotlin Coroutines support asynchrony by allowing suspension of a coroutine without blocking¹. Coroutines integrate with asynchronous libraries on JVM via callbacks.

Single-shot callback

Let us consider a hypothetical interface that has a method to perform an operation asynchronously and takes a callback parameter to report its completion with either a resulting value or an error:

interface Operation<T> {
fun performAsync(callback: (T?, Throwable?) -> Unit)
}

We can define a suspending extension function² in Kotlin to perform an operation without blocking by using from the standard library³:

suspend fun <T> Operation<T>.perform(): T =
suspendCoroutine { continuation ->
performAsync { value, exception ->
when
{
exception != null -> // operation had failed
continuation.resumeWithException(exception)
else -> // succeeded, there is a value
continuation.resume(value as T)
}
}
}

Notice, that this is a cold source of values. It does not do anything until it is called and does not do anything after it has returned since it waits for the operation’s completion via a callback.

Cancellable operation

It is a good engineering practice to provide some means of cancellation in an asynchronous API. For example, might have a method for this purpose:

interface Operation<T> {
fun performAsync(callback: (T?, Throwable?) -> Unit)
fun cancel() // cancels ongoing operation
}

We can now implement as a cancellable suspending function using from library:

suspend fun <T> Operation<T>.perform(): T =
suspendCancellableCoroutine { continuation ->
performAsync { /* ... as before ... */ }
continuation.invokeOnCancellation { cancel() }
}

Multi-shot callback

But what if delivers an asynchronous stream of values and calls the specified callback multiple times? It has to indicate its completion somehow, too. Let’s assume for this simple example that it does so by calling a callback with value.

We cannot use such with -like functions lest we get when we try to resume continuation the second time, because Kotlin suspension and continuations are single-shot.

Kotlin Flow to the rescue! Flow is explicitly designed to represent a cold asynchronous stream of multiple values. We can use function to convert a multi-shot callback into a flow:

fun <T : Any> Operation<T>.perform(): Flow<T> =
callbackFlow {
performAsync { value, exception ->
when
{
exception != null -> // operation had failed
close(exception)
value == null -> // operation had succeeded
close()
else -> // there is a value
offer(value as T)
}
}
awaitClose { cancel() }
}

Notice a number of important differences. First of all, is no longer a suspending function. It does not wait for anything by itself. It returns a cold . The code inside block is not called until this flow is collected by a caller of a terminal operation.

As before, installs a callback, but now instead of a we are working with a hot that is open to deliver values. So, the function is called for each value and the function is called to signal a failure or a successful completion. Here replaces and also serves an important function of suspending the block inside while the values are incoming.

Backpressure

What happens if delivers values to a callback faster than collecting coroutine can process them? Enter the question of backpressure that always arises when dealing with asynchronous data streams. There is a buffer to keep a few values, but when this buffer overflows returns and values are lost. There are several approaches to avoid or control the loss.

One is to replace with . In this case the thread calling the callback gets blocked on buffer overflow until there is more room in the buffer. It is a typical way to signal backpressue in most of legacy streaming callback-based APIs and it guarantees that no value is ever lost.

If the number of expected values is limited or they should not be arriving too fast then we can use operator to configure the unlimited buffer size by adding call after . In this case always returns , so no value will ever be lost and there is no blocking. However, there is a potential to exhaust memory with the buffered values.

Conflation

Often, the stream of values represents some partial result of operation or its status update so that only the most recent value is truly interesting. It means that values can be safely conflated using operator on the resulting flow, which guarantees that always returns and that the collector sees the most recent value, even though intermediate values can be dropped (conflated).

Reactive streams

If the original source of an asynchronous stream is represented as a reactive stream that is compliant with the reactive streams specification, then a built-in extension function from module shall be used to convert a reactive streams type to a Kotlin . No need to reinvent the wheel.

Further reading and footnotes

  1. ^ Blocking threads, suspending coroutines explains practical difference between blocking and suspension.
  2. ^ Extension-oriented design highlights design philosophy behind extension functions.
  3. ^ Coroutines design document has more details on callbacks and suspension.
  4. ^ Cold flows, hot channels defines the concept of a cold data source.
  5. ^ Simple design of Kotlin Flow gives the basics of flows.
  6. ^ Kotlin Flows and coroutines shows the conceptual implementation of operator.
  7. ^ Reactive Streams and Kotlin Flows emphasises the similarities between reactive streams and Kotlin flows.

Project Lead for the Kotlin Programming Language @JetBrains

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store