Simple design of Kotlin Flow
In a previous “Cold flows, hot channels” story¹ I’ve defined cold and hot data streams and shown a use-case for Kotlin Flows — cold asynchronous streams. Now let us peek under the hood, examine their design, and see how a combination of language features and a library enables a powerful abstraction with simple design.
A Flow
in Kotlin is represented by an interface²:
interface Flow<out T> {
suspend fun collect(collector: FlowCollector<T>)
}
All there is to a flow is a single collect
function that accepts an instance of FlowCollector
interface with a single emit
method:
interface FlowCollector<in T> {
suspend fun emit(value: T)
}
An emit
name should sound familiar to a reader of “Cold flows, hot channels”. Indeed, there I’ve shown an example of the following flow definition:
val ints: Flow<Int> = flow {
for (i in 1..10) {
delay(100)
emit(i) // <-- emit is called here
}
}
A signature of the flow
builder also uses a FlowCollector
interface as a receiver³, so that we can emit
directly from the body of the corresponding lambda:
fun <T> flow(block: suspend FlowCollector<T>.() -> Unit): Flow<T>
For a simple usage of a flow, when the flow is collected, like this:
ints.collect { println(it) } // takes 1 second, prints 10 ints
what happens is that an instance of FlowCollector
is created based on the lambda passed to collect { … }
function and this very instance is then passed to the flow { … }
body⁴.
Thus an interaction between a flow emitter and a flow collector is that of a simple function call — a call of emit
function. If we mentally inline this function call, we can immediately understand what happens when we run this code⁵ — it is going to be equivalent to:
for (i in 1..10) {
delay(100)
println(i) // <-- emit was called here
}
Operators
A flow
builder and a collect
terminal operator is all we need to know to start writing operators that transform flows in a variety of ways. For example, a basic map
operator that applies a specified transform to every emitted value can be implemented like this:
fun <T, R> Flow<T>.map(transform: suspend (value: T) -> R) = flow {
collect { emit(transform(it)) }
}
Using this operator we can now do ints.map { it * it }
to define a flow with squares of the original integers. Elements still flow from the emitter to the collector via function calls. There is simply one more function in between now.
Actually, kotlinx.coroutines
library already defines map
and a host of other general purpose operators as extensions on the Flow
type, following extension-oriented design⁶ approach. What is important in this design, is that it is quite easy to define domain-specific operators. There is no distinction between “built-in” and “user-defined” operators — all operators are first-class.
Back-pressure
Back-pressure in software engineering is defined as the ability of a data consumer that cannot keep up with incoming data to send a signal to the data producer to slow down the rate of the data elements.
Traditional reactive streams⁷ design involves a back-channel to request more data from producers as needed. Management of this request protocol leads to notoriously difficult⁸ implementations even for simple operators. We do not see any of this complexity in the design of Kotlin flows, nor in the implementation of operators for them, yet Kotlin flows do support back-pressure. How come?
Transparent back-pressure management is achieved in Kotlin flows via the use of Kotlin suspending functions. You might have noticed that all functions and functional types in Kotlin flow design are marked with suspend
modifier — these functions have a super-power to suspend execution of caller without blocking a thread⁹. So, when collector of the flow is overwhelmed, it can simply suspend the emitter and resume it later when it is ready to accept more elements.
This is quite similar to back-pressure management in traditional thread-based synchronous data pipelines, where a slow consumer automatically applies back-pressure onto the producer by the virtue of blocking producer’s thread. Suspending functions take it beyond a single thread and into the realm of asynchronous programming, by transparently managing back-pressure across the threads without blocking them. But that is to be told in another story.
Further reading and footnotes
- ^ Cold flows, hot channels
- ^
Flow
and related types and functions are still in preview as of version1.2.1
ofkotlinx.coroutines
library. Read more here. - ^ Function types in Kotlin
- ^ This is a slight simplification. It does not take into account additional checks to ensure preservation of context, but that topic is out of the scope of this story. More details in Execution context of Kotlin Flows.
- ^ You can run this code via Kotlin Playground here.
- ^ Extension-oriented design
- ^ Reactive streams
- ^ Implementing operators for [RxJava] 2.0
- ^ Blocking threads, suspending coroutines