ProAndroidDev

The latest posts from Android Professionals and Google Developer Experts.

Follow publication

Kotlin Coroutine Flows: Deep Dive (Part 1 Cold Flows)

Kotlin logo

As Android projects migrate to Kotlin, many developers are turning to Kotlin’s coroutines library for asynchronous programming. One of the most popular features of coroutines is the Flow API, which allows for asynchronous data streams to be processed in a more efficient and streamlined manner.

Flow is essentially a reactive stream of data that can be emitted from a producer and collected by a consumer. In other words, Flow is a way of asynchronously emitting a sequence of values over time.

With Flow, data can be emitted in a non-blocking manner, which makes it ideal for handling long-running or I/O-bound operations. Unlike LiveData, which is designed to handle UI-related events, Flow is designed to handle arbitrary data streams.

To create a Flow, one needs to define a producer that emits a sequence of data items, and a consumer that collects these items. The producer is responsible for emitting data items by calling the emit() function, and the consumer is responsible for collecting these items by calling the collect() function.

The collect() function is a suspending function, which means that it can be called from a coroutine. This allows Flow to seamlessly integrate with coroutines, making it an ideal choice for asynchronous programming on Android.

In the next sections, we’ll explore the Flow API in more detail and show how it can be used to handle asynchronous data streams in Android applications.

Introduction:

In Coroutine Flow, a Flow is a cold asynchronous stream that can emit multiple values over time. It is defined by the Flow interface, which has a single method called collect. The collect method takes a FlowCollector interface that has a single method called emit. The emit method is used to emit a value to the FlowCollector, which is then propagated downstream.

Let’s see that how Flows are created. Let’s say that we have these interfaces for Flow and FlowCollector:

fun interface Flow<T> {
suspend fun collect(collector: FlowCollector<T>)
}

fun interface FlowCollector<T> {
suspend fun emit(value: T)
}

The Flow interface represents the data stream that emits data items, and the FlowCollector interface represents the data receiver that consumes data items emitted by the Flow.

To use these interfaces to create a flow, we can define a lambda function that takes a FlowCollector as an argument and emits some data items:

val flow = object : Flow<String> {
override fun collect(collector: FlowCollector<String>) {
collector.emit("Hello")
collector.emit("World")
}
}

The above code can also be written using lambda syntax, as follows:

val flow = Flow<String> { collector ->
collector.emit("Hello")
delay(1000)
collector.emit("World")
}

This flow emits two strings: “Hello” and “World”, with a delay of one second in between. Note that the Flow lambda function is a suspending function, which means it can use delay or any other suspending function inside it.

We can then collect the data items emitted by the flow using the collect function, which takes a FlowCollector as an argument:

flow.collect(object : FlowCollector<String> {
override fun emit(value: String) {
println(value)
}
})

The above code can also be written using lambda syntax, as follows:

flow.collect { value ->
println(value)
}

Upstream and DownStream

In the context of coroutine Flow, the term upstream refers to the source of data items that are emitted in the flow. The downstream receiver, on the other hand, is the consumer of these data items that are collected from the upstream source. It’s important to note that the downstream receiver can also be a flow itself, creating a chain of data processing stages where each stage collects data from the previous stage.

Upstream and Downstream

Flow Builders

Coroutine Flow provides several builders for creating Flows. Here are some common builders:

flowOf

The flowOf() function is used to create a Flow that emits a fixed set of values. Here’s an example:

val flow = flowOf("Hello", "World")

asFlow

The asFlow() function is used to convert any iterable or sequence to a Flow. Here’s an example:

val flow = listOf(1, 2, 3).asFlow()

flow

The flow{} builder function is used to create a custom Flow. It takes a suspending lambda that provides a FlowCollector as a parameter, and emits values to it using the emit() function. Here’s an example:

val flow = flow {
for (i in 1..3) {
delay(100)
emit(i)
}
}

Transformation Operators

Coroutine Flow provides a variety of operators for transforming flows. These operators allow you to create new flows by transforming the data emitted by a source flow. Here are some of the most commonly used transformation operators:

map

The map operator applies a transformation function to each element emitted by the source flow and emits the transformed element to downstream. Here's an example:

val numbers = (1..5).asFlow()
numbers.map { it * 2 }.collect { println(it) }

In this example, the numbers flow emits values from 1 to 5. The map operator applies the transformation function { it * 2 } to each of these values and emits the transformed values (2, 4, 6, 8, 10) to downstream.

filter

The filter operator filters the elements emitted by the source flow using a predicate function and emits only the elements that satisfy the predicate. Here's an example:

val numbers = (1..5).asFlow()
numbers.filter { it % 2 == 0 }.collect { println(it) }

In this example, the numbers flow emits values from 1 to 5. The filter operator filters out the odd values (1, 3, 5) and emits only the even values (2, 4) to downstream.

transform

The transform operator allows you to perform arbitrary transformations on the data emitted by the source flow. The transformation function takes a value and a FlowCollector, and emits zero or more values to the collector. Here's an example:

val numbers = (1..5).asFlow()
numbers.transform { value ->
emit("A$value")
emit("B$value")
}.collect { println(it) }

in this example, the numbers flow emits values from 1 to 5. The transform operator transforms each value into two strings ("A$value" and "B$value") and emits them to downstream.

zip

The zip operator combines two flows into a single flow of pairs. Each pair contains one element from each source flow. Here's an example:

val a = (1..5).asFlow()
val b = (6..10).asFlow()
a.zip(b) { x, y -> x + y }.collect { println(it) }

In this example, the a flow emits values from 1 to 5 and the b flow emits values from 6 to 10. The zip operator combines these flows into a single flow of pairs and emits the sum of each pair (7, 9, 11, 13, 15) to downstream.

These are just a few of the many transformation operators available in Coroutine Flow. Using these operators, you can easily create new flows by transforming the data emitted by existing flows.

Terminal Operators

A terminal operator is a function that triggers the Flow’s collection process, which causes the Flow to emit values and terminate.

Here are some common terminal operators in Kotlin Flows:

toList() - Collects all the emitted values and returns them as a list. This operator is a suspend function and must be called from within a coroutine.

toSet() - Collects all the emitted values and returns them as a set. This operator is a suspend function and must be called from within a coroutine.

count() - Counts the number of emitted values and returns the result as an integer. This operator is a suspend function and must be called from within a coroutine.

reduce() - Applies a binary operation to the emitted values and returns a single result. This operator is a suspend function and must be called from within a coroutine.

fold() - Applies a binary operation to the emitted values, with an initial accumulator value, and returns a single result. This operator is a suspend function and must be called from within a coroutine.

collect() - Collects the emitted values and applies the specified action to each one. This operator is a suspend function and must be called from within a coroutine.

Flattening Flows

Flattening flows is the process of taking a flow of flows and transforming it into a single, flat flow that emits items from all of the nested flows.

flatMapConcat operator is used when you want to merge multiple flows into a single flow by sequentially concatenating them one after another. It takes a function that transforms each emitted value of the source flow to another flow and then flattens the resulting flows. This means that the downstream collector receives the values from the resulting flow in the same order they were emitted by the source flow, with each transformed flow's values emitted in the order they were emitted.

val flow1 = flowOf("A", "B", "C")
val flow2 = flowOf("D", "E", "F")
val flow3 = flowOf("G", "H", "I")

flowOf(flow1, flow2, flow3)
.flatMapConcat { it }
.collect {
println(it)
}

// Output: A B C D E F G H I

flatMapMerge operator is used when you want to merge multiple flows into a single flow without any specific order, meaning that the downstream collector may receive values in any order. It takes a function that transforms each emitted value of the source flow to another flow and then flattens the resulting flows. The resulting flow may emit values from the transformed flows in any order.

val flow1 = flowOf("A", "B", "C")
val flow2 = flowOf("D", "E", "F")
val flow3 = flowOf("G", "H", "I")

flowOf(flow1, flow2, flow3)
.flatMapMerge { it }
.collect {
println(it)
}

// Output: A D G B E H C F I

flatMapLatest operator is used when you want to transform the values emitted by a flow to another flow, but you only want to collect values from the most recent transformed flow, and ignore any previously transformed flows. It takes a function that transforms each emitted value of the source flow to another flow and then flattens the resulting flows. When a new value is emitted by the source flow, the previous transformation is cancelled and the new transformation is used to collect values from the resulting flow.

val flow1 = flowOf("A", "B", "C")
val flow2 = flowOf("D", "E", "F").onEach { delay(100) }
val flow3 = flowOf("G", "H", "I")

flowOf(flow1, flow2, flow3)
.flatMapLatest { it }
.collect {
println(it)
delay(250)
}

// Output: A, B, C, G, H, I

Exception Transparency

Exception transparency refers to the ability of a program to correctly handle exceptions in a transparent and predictable manner, without compromising the correctness or safety of the program. In other words, exceptions should be able to propagate through code in a way that makes it clear what the error is and where it occurred.

What’s wrong with this code?

try {
flow.collect { println(it) }
} catch (e: RuntimeException) {
println("Error")
}

When you wrap the collect function inside a try-catch block, you are catching any exceptions that might occur inside the collect function. However, this can make it difficult to trace the origin of the exception, as it may have been thrown by a function call inside the collect function. This can lead to confusion and make debugging more difficult.

Instead, the recommended approach is to let exceptions propagate transparently by not catching them inside the collect function. This way, if an exception is thrown, it will propagate up the call stack and be caught by the calling code, which can handle it appropriately.

val flow = flow {
emit(1)
throw RuntimeException("Oops!")
emit(2)
}.catch { e ->
emit(-1)
}

flow.collect { value ->
println(value)
}

In this example, we’ve added a catch block to the FlowBuilder. When the exception is thrown, the catch block catches the exception and emits a value of -1. The downstream receiver receives this value instead of the exception, and the collection process continues.

This behavior is possible because Coroutine Flows use a cooperative cancellation model, which means that when an exception occurs, the FlowBuilder cooperatively cancels the collection process and propagates the exception to the downstream receiver. This makes handling exceptions in Coroutine Flows more transparent and less error-prone.

Execution context

Have you ever tried to change coroutine dispatcher while creating a flow emitter?

flow {
withContext(Dispatchers.Default) {
while (isActive) {
emit(someData())
}
}
}

If you collect this flow you will face an exception with such message:

Flow invariant is violated: Flow was collected in [CoroutineId(2), “coroutine#2”:StandaloneCoroutine{Active}@4756c09e, Dispatchers.Default], but emission happened in

The execution context of a Kotlin Flow is crucial to understanding why you cannot use withContext inside a flow to change the coroutine context in which an item is emitted. The execution context of a flow refers to the coroutine context in which the flow was created and determines the thread pool and dispatcher that are used for the execution of each flow.

Consider an Android app that wants to update the UI with data from a flow. In this case, the flow needs to emit items on the UI thread to avoid concurrency issues. If you use withContext inside the flow to change the coroutine context and emit items on a background thread, the UI update may occur on a different thread than the one on which the UI was created, leading to exceptions and crashes.

Additionally, if you allow flows to emit items in different coroutine contexts, every flow collector would need to write boilerplate code to ensure that its block of execution happens in the correct context. Alternatively, you would need to establish project-wide rules and limitations on the context in which elements of the flow are allowed to be emitted, which can be cumbersome and difficult to enforce.

To address these issues, you should use the flowOn operator to change the execution context of a flow.

flowOn

flow
.flowOn(Dispatchers.IO)
.collect { println(it) }

When using the flowOn operator to change the dispatcher of the upstream emitter in a flow, a ChannelCoroutine is added in the middle of the collector and the flow. This ChannelCoroutine has the specified dispatcher as its coroutine context element. When calling the Flow.collect { } function, data is received from this channel, ensuring that the downstream collector runs in the specified coroutine context while preserving the execution context of the flow. This enables you to control the coroutine context in which downstream processing occurs while ensuring that the flow remains exception-transparent and concurrency-safe.

Backpressure and Buffering

When dealing with large amounts of data, backpressure becomes an important concern. It refers to the ability of a consumer to signal a producer to slow down the rate of production when the consumer is unable to keep up with the rate of consumption. In Kotlin Flows, backpressure is handled through suspending the producer when the downstream is not ready to receive more data.

Suppose you have a flow that emits a large number of items, and each item requires some time-consuming processing before it can be collected. The collector will wait for each item to be processed before requesting the next one. This can result in a significant delay in the overall execution time.

fun slowFlow(): Flow<Int> = flow {
repeat(10) {
delay(100)
emit(it)
}
}

fun main() = runBlocking {
measureTimeMillis {
slowFlow().collect {
delay(100)
println("Received $it")
}
println("Done")
}.let {
println("Collected in $it ms")
}
}

In this example, the slowFlow() function emits integers one by one with a delay of 100 milliseconds between each emission. The collect function also has a delay of 100 milliseconds between each consumption. Therefore, it will take a total of 2 seconds to complete the collection of all the integers emitted.

Buffer

However, if we add a buffer to the flow, we can reduce the total time taken to complete the collection.

fun slowFlow(): Flow<Int> = flow {
repeat(10) {
delay(100)
emit(it)
}
}.buffer()

fun main() = runBlocking {
measureTimeMillis {
slowFlow().collect {
delay(100)
println("Received $it")
}
println("Done")
}.let {
println("Collected in $it ms")
}
}

The buffer function receives 2 parameters; capacity and onBufferOverflow.

The capacity parameter specifies the maximum number of values that can be stored in the buffer. If more values are emitted than the buffer can hold, the onBufferOverflow parameter specifies what action should be taken;

SUSPEND is the default value, and it means that if the buffer is full, any attempt to send a new element to the buffer will suspend the sender until space becomes available in the buffer.

DROP_OLDEST means that if the buffer is full, any attempt to send a new element to the buffer will drop the oldest element currently in the buffer and add the new element.

DROP_LATEST means that if the buffer is full, any attempt to send a new element to the buffer will drop the new element and keep the current buffer contents unchanged.

Sign up to discover human stories that deepen your understanding of the world.

Free

Distraction-free reading. No ads.

Organize your knowledge with lists and highlights.

Tell your story. Find your audience.

Membership

Read member-only stories

Support writers you read most

Earn money for your writing

Listen to audio narrations

Read offline with the Medium app

Published in ProAndroidDev

The latest posts from Android Professionals and Google Developer Experts.

No responses yet

Write a response