Exceptions in Kotlin Flows
Conceptually Kotlin’s Flow<T>
type represents an asynchronous cold stream¹ of elements of type T
that can complete successfully or with an exception. Let us see how these exceptions can be handled and what we can learn about flows and exceptions from the basic principles.
Suppose that we are writing a UI application that displays an updating stream of values in UI and thus collects them from a flow. This application has a uiScope
that is a CoroutineScope
which lifetime is bound to the corresponding UI element that is displaying the data. There is a dataFlow()
function that returns a flow with the data to be displayed and so the data display can be activated like this:
uiScope.launch { // launch a UI display coroutine
dataFlow().collect { value -> updateUI(value) }
}
Flow guarantees that updateUI
is always called in the collector’s execution context which is defined by uiScope
here. Even if dataFlow()
is using a different context internally this fact does not leak from it in any way².
But what happens if there is an error in the dataFlow()
? In this case, collect
call throws an exception, which leads to exceptional completion of the coroutine, which gets propagated to the uiScope
and, usually, will end up calling an uncaught exception handler (CoroutineExceptionHandler
) in its context. This is fine if exception was truly unexpected and should never happen in correct code, but what if dataFlow()
, for example, is reading data from the network and a failure will quite expectedly happen when there is something wrong with the network? It needs to be handled. The failure is reported via an exception and can be handled just like exceptions are normally handled in Kotlin — using try
/catch
block³:
uiScope.launch {
try {
dataFlow().collect { value -> updateUI(value) }
} catch (e: Throwable) {
showErrorMessage(e)
}
}
If we encapsulate this exception-handling logic into an operator on the flow⁴ then we can simplify this code, reduce nesting, and make it more readable:
uiScope.launch {
dataFlow()
.handleErrors() // handle dataFlow errors
.collect { value -> updateUI(value) }
}
But how can we implement this handleErrors
function? A naive attempt to write it is shown below:
fun <T> Flow<T>.handleErrors(): Flow<T> = flow {
try {
collect { value -> emit(value) }
} catch (e: Throwable) {
showErrorMessage(e)
}
}
This implementation collects values from the upstream flow it is called on and emits them downstream, wrapping the collect
call into the try
/catch
block just as we did before. It simply abstracts the code we initially wrote. Would it work? Yes, for this particular case. So why exactly this implementation is naive?
Exception transparency
Think about the properties of the flow that is returned by handleErrors
:
val flow = dataFlow().handleErrors()
It emits some values like any other flow, but it also has an additional property that other flows do not have — any error in the downstream flow is caught by it. Consider the code below with Kotlin error
function as a litmus test:
flow.collect { error("Failed") }
If you run it with a simple flow then this code throws anIllegalStateException
on the first emitted value. But with the flow returned by handleError
this exception gets caught and does not appear, so collect
call completes normally. It is totally surprising to the reader of this code who should not be required to know the implementation details of the flow they are trying to collect from.
Kotlin flows are designed to allow modular reasoning about data streams. The only supposed effects of flows are their emitted values and completion, so flow operators like handleError
are not allowed by the flow specification. Every flow implementation has to ensure exception transparency — a downstream exception must always be propagated to the collector⁵.
Handling exceptions
Kotlin flows provide several exception handling operators that ensure exception transparency. In our case we can use the catch
operator:
fun <T> Flow<T>.handleErrors(): Flow<T> =
catch { e -> showErrorMessage(e) }
However, the resulting code has a different behavior from the original one we wrote using try
/catch
because it does not catch the errors that might happen inside collect { value -> updateUI(value) }
call due to exception transparency. We can continue handling the errors in updateUI
by rewriting the code like this:
uiScope.launch {
dataFlow()
.onEach { value -> updateUI(value) }
.handleErrors()
.collect()
}
Moving updateUI
from collect
into onEach
operator, we have placed it before the error-handling in handleErrors
, so updateUI
errors are handled now, too. As a finishing touch we can now merge launch
and collect
calls using launchIn
terminal operator, further reducing nesting in this code and turning it into a simple left-to-right sequence of operators:
dataFlow()
.onEach { value -> updateUI(value) }
.handleErrors()
.launchIn(uiScope)
API Status
Kotlin Flow is an experimental type in kotlinx.coroutines
library as of the second milestone (preview) version 1.3.0-M2
to the upcoming 1.3.0
release. Some further changes might still be possible before it is released as a stable API, but the overall API shape looks quite solid now.
Footnotes and further reading
- ^ Cold flows, hot channels gives a definition of cold stream.
- ^ Execution context of Kotlin Flows has more details on contexts.
- ^ Kotlin exceptions reference documentation explains Kotlin exceptions.
- ^ Simple design of Kotlin Flow introduces the concept of operator.
- ^ Flow documentation has more detailed exception transparency description.