Callbacks and Kotlin Flows
Asynchrony, in computer programming, refers to the occurrence of events independent of the main program flow and ways to deal with such events, … without the program blocking to wait for them (from Wikipedia)
In programming languages without built-in support for asynchrony there are two patterns that are used to implement it: callbacks and futures (aka promises). In fact, callbacks are the basic primitives and futures in asynchronous programming are backed by callbacks.
For example, Java 5 Future
type does not support waiting for its completion without blocking. You can only use a blocking get
method to wait for it. However, Java 8 has an extended CompletableFuture
type with whenComplete
method that installs a callback to wait for completion without blocking, thus making it usable for asynchronous programming.
Kotlin Coroutines support asynchrony by allowing suspension of a coroutine without blocking¹. Coroutines integrate with asynchronous libraries on JVM via callbacks.
Single-shot callback
Let us consider a hypothetical Operation
interface that has a method to perform an operation asynchronously and takes a callback parameter to report its completion with either a resulting value or an error:
interface Operation<T> {
fun performAsync(callback: (T?, Throwable?) -> Unit)
}
We can define a suspending extension function² in Kotlin to perform an operation without blocking by using suspendCoroutine
from the standard library³:
suspend fun <T> Operation<T>.perform(): T =
suspendCoroutine { continuation ->
performAsync { value, exception ->
when {
exception != null -> // operation had failed
continuation.resumeWithException(exception)
else -> // succeeded, there is a value
continuation.resume(value as T)
}
}
}
Notice, that this perform
is a cold source of values. It does not do anything until it is called and does not do anything after it has returned since it waits for the operation’s completion via a callback⁴.
Cancellable operation
It is a good engineering practice to provide some means of cancellation in an asynchronous API. For example, Operation
might have a cancel
method for this purpose:
interface Operation<T> {
fun performAsync(callback: (T?, Throwable?) -> Unit)
fun cancel() // cancels ongoing operation
}
We can now implement perform
as a cancellable suspending function using suspendCancellableCoroutine
from kotlinx.coroutines
library:
suspend fun <T> Operation<T>.perform(): T =
suspendCancellableCoroutine { continuation ->
performAsync { /* ... as before ... */ }
continuation.invokeOnCancellation { cancel() }
}
Multi-shot callback
But what if Operation
delivers an asynchronous stream of values and calls the specified callback multiple times? It has to indicate its completion somehow, too. Let’s assume for this simple example that it does so by calling a callback with null
value.
We cannot use such Operation
with suspendCoroutine
-like functions lest we get IllegalStateException
when we try to resume continuation the second time, because Kotlin suspension and continuations are single-shot.
Kotlin Flow to the rescue! Flow is explicitly designed to represent a cold asynchronous stream of multiple values⁵. We can use callbackFlow
function to convert a multi-shot callback into a flow:
fun <T : Any> Operation<T>.perform(): Flow<T> =
callbackFlow {
performAsync { value, exception ->
when {
exception != null -> // operation had failed
close(exception)
value == null -> // operation had succeeded
close()
else -> // there is a value
offer(value as T)
}
}
awaitClose { cancel() }
}
Notice a number of important differences. First of all, perform
is no longer a suspending function. It does not wait for anything by itself. It returns a cold Flow
. The code inside callbackFlow { ... }
block is not called until this flow is collected by a caller of a terminal operation.
As before, performAsync
installs a callback, but now instead of a Continuation
we are working with a hot SendChannel
that is open to deliver values. So, the offer
function is called for each value and theclose
function is called to signal a failure or a successful completion. Here awaitClose
replaces invokeOnCancellation
and also serves an important function of suspending the block inside callbackFlow
while the values are incoming.
Backpressure
What happens if performAsync
delivers values to a callback faster than collecting coroutine can process them? Enter the question of backpressure that always arises when dealing with asynchronous data streams. There is a buffer to keep a few values, but when this buffer overflows offers
returns false
and values are lost. There are several approaches to avoid or control the loss.
One is to replace offer(value)
with sendBlocking(value)
. In this case the thread calling the callback gets blocked on buffer overflow until there is more room in the buffer. It is a typical way to signal backpressue in most of legacy streaming callback-based APIs and it guarantees that no value is ever lost.
If the number of expected values is limited or they should not be arriving too fast then we can use buffer
operator to configure the unlimited buffer size by adding .buffer(Channel.UNLIMITED)
call after callbackFlow { ... }
. In this case offer
always returns true
, so no value will ever be lost and there is no blocking. However, there is a potential to exhaust memory with the buffered values⁶.
Conflation
Often, the stream of values represents some partial result of operation or its status update so that only the most recent value is truly interesting. It means that values can be safely conflated using conflate
operator on the resulting flow, which guarantees that offer
always returns true
and that the collector sees the most recent value, even though intermediate values can be dropped (conflated).
Reactive streams
If the original source of an asynchronous stream is represented as a reactive stream that is compliant with the reactive streams specification, then a built-in Publisher.asFlow
extension function from kotlinx-coroutines-reactive
module shall be used to convert a reactive streams Publisher
type to a Kotlin Flow
⁷. No need to reinvent the wheel.
Further reading and footnotes
- ^ Blocking threads, suspending coroutines explains practical difference between blocking and suspension.
- ^ Extension-oriented design highlights design philosophy behind extension functions.
- ^ Coroutines design document has more details on callbacks and suspension.
- ^ Cold flows, hot channels defines the concept of a cold data source.
- ^ Simple design of Kotlin Flow gives the basics of flows.
- ^ Kotlin Flows and coroutines shows the conceptual implementation of
buffer
operator. - ^ Reactive Streams and Kotlin Flows emphasises the similarities between reactive streams and Kotlin flows.