Shared flows, broadcast channels

Roman Elizarov
6 min readNov 16, 2020

--

Photo by Davies Designs Studio on Unsplash

Once upon a time coroutines were introduced to Kotlin and they were lightweight. We could launch a multitude of coroutines and we needed a way to communicate between those coroutines without running into a dreaded “mutable shared state” problem.

Thus Channel was added as an inter-coroutine communication primitive. The channels are wonderful. Channels support one-to-one, one-to-many, many-to-one, and many-to-many communication between coroutines, and every value that is sent to the channel is received once.

Diagram of many-to-many channel operation

You cannot use channels to distribute events or state updates in a way that allows multiple subscribers to independently receive and react upon them.

Thus the BroadcastChannel interface was introduced with buffered and ConflatedBroadcastChannel as its implementations. They served us well for a while, but they turned out to be a design dead-end. Now, since kotlinx-coroutines version 1.4 we have introduced a better solution — shared flows. Read on for the full story.

Flows are simple

In the early versions of the library, we had only channels and we tried to implement various transformations of asynchronous sequences as functions that take one channel as an argument and return another channel as a result. It means that, for example, a filter operator would run in its own coroutine.

Diagram of filter operator with channels

The performance of such an operator was far from great, especially compared to just writing an if statement. In a hindsight, it is not surprising, because a channel is a synchronization primitive. Any channel, even an implementation that is optimized for a single producer and a single consumer, must support concurrent communicating coroutines and a data transfer between them needs synchronization, which is expensive in modern multicore systems. When you start building your application architecture on top of the asynchronous data streams, the need to have transformations naturally appears, and the channel costs start to accrue.

The simple design of Kotlin Flow allows efficient implementation of transformation operators. In basic cases, values are emitted, transformed, and collected in the same coroutine, without any need for synchronization.

Diagram of filter operator with flows

Synchronization is introduced with flows only when it is needed for emission and collection of values in different coroutines.

Flows are cold

However, the flows are typically cold — a Flow<Value> created by flow { … } builder function is a passive entity. Consider the following code:

val coldFlow = flow {
while
(isActive) {
emit(nextEvent)
}
}

The flow itself is not backed by any kind of computation and does not have any state by itself until it starts to be collected. Every collector coroutine executes its own instance of emitting code. The story on “Cold flow, hot channels” describes the reasoning behind Kotlin Flows and shows use-cases for which they fit better than channels — returning asynchronous streams of values that are computed on demand.

Diagram of cold flow operation

But how do you handle things like user actions, external device events, state updates, etc? They are operating independently of whether there is any code that is interested in them. They should support multiple observers inside the application. These are so-called hot sources of events.

Shared flows

That’s where the concept of aSharedFlow comes in. A shared flow exists regardless of whether it is being collected or not. A collector of the shared flow is called a subscriber. All subscribers of a shared flow receive the same sequence of values. It effectively works like a “broadcast channel”, without most of the channel overhead. It makes the concept of a broadcast channel obsolete.

Diagram of shared flow operation

Essentially a shared flow is a lightweight broadcast event bus that you can create and use in your application architecture.

class BroadcastEventBus {
private val _events = MutableSharedFlow<Event>()
val events = _events.asSharedFlow() // read-only public view

suspend fun postEvent(event: Event) {
_events.emit(event) // suspends until subscribers receive it
}
}

It has tunable parameters such as the number of old events to keep and replay for new subscribers and the extraBufferCapacity to provide cushion for fast emitters and slow subscribers.

All subscribers of a shared flow are asynchronously collecting events in their own context. Emitter does not wait until subscribers finish processing the events. However, when the shared flow buffer is full, the emitter suspends until there is room in the buffer. This suspension of emitter on buffer overflow provides back-pressure to slow down emission when collectors cannot keep up. Alternative strategies to deal with buffer overflow are supported via BufferOverlow parameter.

State flows

A popular way to deal with buffer overflow is to drop the oldest events and retain only the most recent, newest events. In particular, it is a great way to model state variables in an application. It is such a widespread use-case that it has its own specialized StateFlow type which serves as a replacement for a ConflatedBroadcastChannel, which became obsolete, too.

class StateModel {
private val _state = MutableStateFlow(initial)
val state = _state.asStateFlow() // read-only public view

fun update(newValue: Value) {
_state.value = newValue // NOT suspending
}
}

Think of a val x: StateFlow<T> as an asynchronous and observable counterpart of var x: T . Its most recent value is always available and, in fact, the most recent value is the only one that matters, so updating it is always possible without suspension.

With the state flow, the performance difference between a complex channel and a simple flow becomes quite apparent. An implementation of a state flow has allocation-free updates, which was not the case with a conflated broadcast channel.

A use-case for channels

As different kinds of shared flows replaced different kinds of broadcast channels the popular question is what going to happen with plain, regular channels? They are going to stay for many reasons. One reason is that channels are low-level primitives that are used to implement many complex flow operators.

But channels also have their application use-cases. Channels are used to handle events that must be processed exactly once* (see sidenote below for details). This happens in a design with a type of event that usually has a single subscriber, but intermittently (at startup or during some kind of reconfiguration) there are no subscribers at all, and there is a requirement that all posted events must be retained until a subscriber appears.

class SingleShotEventBus {
private val _events = Channel<Event>()
val events = _events.receiveAsFlow() // expose as flow

suspend fun postEvent(event: Event) {
_events.send(event) // suspends on buffer overflow
}
}

Both BroadcastEventBus , that is written with the SharedFlow in the first example, and this SingleShotEventBus, that is written with the Channel, publicly expose their events as Flow<Event>, yet they have an important difference.

With the shared flow, events are broadcast to an unknown number (zero or more) of subscribers. In the absence of a subscriber, any posted event is immediately dropped. It is a design pattern to use for events that must be processed immediately or not at all.

With the channel, each event is delivered to a single subscriber. An attempt to post an event without subscribers will suspend as soon as the channel buffer becomes full, waiting for a subscriber to appear. Posted events are not dropped.

Note that the SingleShotEventBus implementation with a channel processes each posted event exactly once only in the absence of cancellation. When a subscriber to the flow is cancelled, the event may fail to get delivered. See the documentaion on undelivered elements in channels for details.

Bottom line

Know the difference and use both shared flows and channels appropriately. They are both useful and are designed to work well together. However, broadcast channels are obsolete artifacts of the past, they will be deprecated and removed in the future.

--

--

Roman Elizarov
Roman Elizarov

Written by Roman Elizarov

Project Lead for the Kotlin Programming Language @JetBrains

Responses (11)