Execution context of Kotlin Flows

Roman Elizarov
4 min readJun 2, 2019

--

Context by Bart Everson

There are many cases when execution context of a code is important. In server-side programs context may carry diagnostic information; in UI applications widgets can only be touched from the specific main thread. This may create a potential problem when your code becomes larger, especially when you decouple data producers and data consumers. Kotlin Flows are designed to enable such modularization, so let us see how they behave with respect to an execution context.

Collector

For example, a UI application may launch a coroutine in the main thread to collect elements from a flow that is returned by some dataFlow() function and update display with its values:

launch(Dispatchers.Main) { // launch in the main thread
initDisplay() // prepare ui
dataFlow().collect { // block of the collector begins
updateDisplay(it) // update ui
}
}

Recall¹ that collecting a flow starts the code of the flow emitter which calls emit to deliver values into the block of the collector. But what if the author of this flow had to perform some CPU-consuming computation and wrote something like this:

fun dataFlow(): Flow<Data> = flow { // create emitter
withContext(Dispatchers.Default) {
while (isActive) {
emit(someDataComputation())
}
}
}

This implementation of dataFlow() function calls someDataComputation() in the context of the Default dispatcher to ensure that it does not block important threads, like the UI main thread².

If this kind of flow emitter implementation is allowed, then updateDisplay in the collector’s code would try to update UI from the wrong thread. The very possibility of this kind of flow implementation would force every flow collector to write some boiler-plate code to ensure that execution of its block happens in the right context or to establish some project-wide contentions on the context in which elements of the flow are allowed to be emitted. These conventions are hard to maintain as project becomes larger and 3rd party libraries and operators are taken into account — some of them might fail to document their emission context at all, but even when they do, it places too much cognitive load on developers who have to always consult with documentation and should not forget to explicitly specify the context they need. The worst part of this story is that if you forget about the context then you might end up with a code that passes all the tests but sometimes produces subtle and hard to reproduce errors at runtime.

That is why this kind of implementation of Kotlin Flow is not allowed. Every flow implementation has to preserve the collector’s context. A failure to do so, like the usage of withContext, results in a runtime exception that would typically show up during the first test run. It means that, in fact, flow { ... } builder function does not directly pass the value to collector’s block on emit, but contains a logic to check this context preservation invariant.

With Kotlin Flows it is perfectly safe to write the collector as it was written. No worries. The context in which collect is called will get preserved for the block of code that is passed to this call.

Emitter

But what is the correct way to implement dataFlow() emitter function? To start, we shall remove withContext and emit from the collector’s context:

fun dataFlow(): Flow<Data> = flow { // create emitter
while (isActive) {
emit(someDataComputation())
}
}

However, someDataComputation might block the collector’s thread, freezing UI. There are two ways around it. One is to encapsulate the appropriate context in someDataComputation itself:

fun someDataComputation(): Data = 
withContext(Dispatchers.Default) {
// implementation here
}

This works well for an isolated function, but it is not convenient if the whole code in the flow { ... } needs some specific context, nor it is efficient to switch context back and forth on every value. So there is another solution using flowOn operator on the resulting flow:

fun dataFlow(): Flow<Data> = flow { // create emitter
while (isActive) {
emit(someDataComputation())
}
}.flowOn(Dispatchers.Default) // ^ works on the flow before it

The flowOn function changes the context for the flow that it is applied to, while ensuring context preservation for the collector that is going to be applied after it. Implementation of the flowOn operator creates a separate coroutine with the specified Dispatchers.Default context to collect someDataComputation flow, while emitting in the context of the original collector³.

Operators

The same context preservation rule applies to operators on the flows. Consider the following flow:

dataFlow()
.map { opA(it) } // in contextA
.flowOn(contextA)
.map { opB(it) } // in collector's context

Here, opB is called in the collector’s context, but the context of opA is affected by flowOn operator.

All in all, the rules for execution context with Kotlin Flows are straightforward. Non-blocking code that does not care about its execution context need not take any special precautions. Collectors can always be sure that their execution context is preserved. For a code that needs some specific execution context there is flowOn operator that can be put after the corresponding context-sensitive code to collect the flow above it in the specified context.

--

--

Roman Elizarov
Roman Elizarov

Written by Roman Elizarov

Project Lead for the Kotlin Programming Language @JetBrains

Responses (8)