Cold flows, hot channels

Roman Elizarov
5 min readApr 8, 2019

--

Markus Trienke, Sunset over drift ice

Asynchronous, long-running, or remote operations can be expressed using a future type, so a function returning a Value could be implemented as:

fun fooAsync(p: Params): CompletableFuture<Value> = 
CompletableFuture.supplyAsync { bar(p) }

When you callfooAsync(p) you get a promise to deliver a value in the future and there is an operation bar running in background to compute this value. Now you have to be careful not to lose a reference to this future, because this future is effectively a resource, like an open file. You must wait for it or cancel it if its value is no longer needed¹.

This is known as a hot data source. Unlike a regular function, that is active only during a call to it, a hot source is active even outside of the call to the corresponding function — it might have been active in background before the function was even called, and can still be active after the function was called, like we see here.

Suspending functions

Kotlin programming language provides support for suspending functions. The idiomatic way in Kotlin to represent this asynchronous operation and to avoid all pitfalls of programming with hot futures is:

suspend fun foo(p: Params): Value =
withContext(Dispatchers.Default) { bar(p) }

A caller of foo gets suspended while bar operation is in process. There is no need to worry about accidentally losing a reference to the working background operation and the code written using suspending functions looks familiar — like a regular, blocking, synchronous code. This foo function definition is cold — it is not doing anything before it is called and it will not do anything after returning a value.

Collection of values

What if an operation returns a collection of values? We can use a List type:

suspend fun foo(p: Params): List<Value> =
buildList² { while (hasMore) add(nextValue) }

This signature perfectly captures an API of a REST endpoint that returns a JSON array of values or a similar kind of RPC/RMI endpoint. It suspends its caller while operation is in process and returns the whole list at once.

Stream of values

But what if we are trying to represent a streaming API? That is, when values are arriving one by one, for example, as websocket messages, as a stream of GRPC messages, or via another streaming protocol like RSocket.

For synchronous streams Kotlin provides a Sequence data type:

fun foo(p: Params): Sequence<Value> =
sequence { while (hasMore) yield(nextValue) }

However, if you use a sequence as a return type to represent a streaming API, then waiting for incoming values must block caller’s thread. This is not good for UI apps and not good for scalable server-side code. For asynchronous programming we want to suspend a coroutine instead³.

Hot channels

We can use ReceiveChannel type from kotlinx.coroutines library to represent an asynchronous stream of values:

fun fooProducer(p: Params): ReceiveChannel<Value> =
GlobalScope.produce { while (hasMore) send(nextValue) }

However, we run into the same problem as with futures. The channel represents a hot stream of values. There is a coroutine on the other side of the channel that is working to produce the values, so we cannot just drop a reference to the ReceiveChannel, because the producer is going to be suspended forever waiting for a consumer, wasting memory resources, open network connections, etc.

Structured concurrency somewhat alleviates the problem. Observe that fooProducer launches a coroutine that works concurrently with the rest of the code. We can make this concurrency explicit by declaring fooProducer function as an extension on CoroutineScope:

fun CoroutineScope.fooProducer(p: Params): ReceiveChannel<Value> =
produce { while (hasMore) send(nextValue) }

However, it does not solve the problem completely — it just changes the effect of our bugs. Without structured concurrency lost channels are like lost futures — they produce silent resource leaks. With structured concurrency lost channels prevent completion of the outer coroutine scope, effectively “hanging” ongoing operations. The latter is a more obvious effect to notice during testing, but it is bad anyway. We still cannot write something like this:

val values: ReceiveChannel<Value> = fooProducer(p)
if (someCondition) return anotherResult // Oops! Leaked values
// ... do further work with values ...

All in all, working with channels is not as simple as working with single values using suspending functions or working with synchronous Sequence of values, and involves subtle problems and conventions due to concurrency.

Channels are a great fit to model data sources that are intrinsically hot, data sources that exist without application’s requests for them: incoming network connections, event streams, etc.

Channels, just like futures, are synchronization primitives. You shall use a channel when you need to send data from one coroutine to another coroutine in the same or in a different process, because different coroutines are concurrent and you need synchronization to work with any data in presence of concurrency. However, synchronization always comes at a performance cost.

Cold flows

But what if we don’t need either concurrency or synchronization, but need just non-blocking streams of data? We did not have a type for that until recently, so welcome Kotlin Flow type that is available for preview starting from kotlinx.coroutines version 1.2.0-alpha-2:

fun foo(p: Params): Flow<Value> =
flow { while (hasMore) emit(nextValue) }

Just like a sequence, a flow represents a cold stream of values. Caller of foo gets a reference to the flow instance, but the code inside flow { ... } builder is not active, no resources are bound to it yet. Similar to sequences, flows can be transformed using various common operators like map, filter, etc. Unlike a sequence, a flow is asynchronous and allows suspending functions anywhere in its builder and operators. For example, the following code defines the flow of ten integers with 100 ms delay before each of them:

val ints: Flow<Int> = flow { 
for (i in 1..10) {
delay(100)
emit(i)
}
}

Terminal operators on a flow collect all values emitted by the flow, activating the flow code only for the duration of the corresponding operation. It makes the flow cold — it is not active before the call to terminal operation, not active after, releasing all resources before returning from the call. The most basic terminal operation is called collect. It is a suspending function that suspends the calling coroutine while the flow is being collected:

ints.collect { println(it) } // takes 1 second, prints 10 ints

Conclusion

The Flow is in preview and your feedback is welcome. We can still tweak API and implementation. Unlike channels, flows do not inherently involve any concurrency. They are non-blocking, yet sequential. The goal of flows is to become for asynchronous data streams what suspending functions are for asynchronous operations — convenient, safe, easy to learn and easy to use.

It is too early to discuss the performance of flows, since no performance optimizations have been applied to the code yet, but they promise to considerably exceed the performance of synchronization primitives. I’ll explore these aspects in future updates. Stay tuned¹⁰.

Footnotes and further reading

  1. ^ Futures, cancellation and coroutines
  2. ^ buildList is not an actual function in Kotlin, but proposed: KT-15363
  3. ^ Blocking threads, suspending coroutines
  4. ^ Structured concurrency
  5. ^ Explicit concurrency
  6. ^ Coroutine Context and Scope
  7. ^ What is “concurrent” access to mutable state
  8. ^ There are no binary nor source compatibility guarantees whatsoever for flow preview at this moment. It will be changed in future releases based on your feedback and will be stabilized later.
  9. ^ Please, submit feedback via GitHub issues for kotlinx.coroutines.
  10. ^ Follow my twitter account to get timely updates from me.

--

--

Roman Elizarov
Roman Elizarov

Written by Roman Elizarov

Project Lead for the Kotlin Programming Language @JetBrains

Responses (12)