An Early look at Kotlin Coroutine’s Flow

Starting from version 1.2.0-alpha-2
of coroutines, the library will be adding a new type called Flow
which is an abstraction for a cold stream. To understand what this means and why it is needed, it is pertinent to get the difference between a hot stream vs a cold stream.
What is a cold stream
Currently, a stream of items can be transferred using a Channel<T>.
For example using the produce
coroutine builder, we can create a stream of items that can be consumed by a receiver, but in this case, the body of the producer represents a hot stream. The code within the producer is invoked and items are emitted with or without the presence of a consumer.
The above snippet shows a producer thats send out a stream of Ints
and returns a ReceiveChannel
which a consumer can use to consume each emitted item. However, even without the presence of any consumer, the body of the producer executes. Before the code suspends, it outputs the following:
producer body
sent 0
sent 1
//code suspends at this point since buffer size is 2
Also, when the items sent through the channel have been consumed, no other consumer can consume those same items from the channel. The channel closes automatically when it is done emitting items with or without the items being consumed.
The snippet below shows a producer that sends items through its channel and completes successfully without the items being consumed by any consumer. Since this is a hot stream and we have a buffer size larger than number of emitted items, this code runs to completion without suspending.
The property that the code within the producer gets executed and items are emitted without the presence of a consumer is what basically defines a hot stream. A side effect of this is that items are consumed only once.
A cold stream on the other hand is just like a blue print defining a computation that emits a stream of items. Nothing gets executed or emitted until it is being requested for by a consumer. Observable
and Flowable
types in RxJava
are an example of a structure that represents a cold stream of items. Its body does not get executed until it is subscribed to by a subscriber. Upon subscription, each subscriber gets its own copy of the items emitted by the source.
The new Flow
type is going to give us exactly this same behaviour. A call to any of its builders returns a flow instance which represents a computation that emits a stream of items. The execution of the code within the builder happens when a terminal operator is invoked.
Flow
type itself is an interface with a single function, collect
that takes a FlowCollector
Items produced from the flow are sent out to the consumer through the FlowCollector
which in itself is also a single function interface. The emit
function in the FlowCollector
will be called to emit produced items to the downstream.
Both the collect
function from Flow
and emit
from FlowCollector
are suspend
functions, this means that back pressure is baked into this by default. If the consumer consumes items slower that producer produces, the call to emit
simply suspends until the consumer is able to receive the next item.
Flow Builders
To create a Flow
, you need a Flow
builder and the most important one of the builders which is fundamental to the others is the flow
function shown below:
This function takes as a parameter a suspending extension function on a FlowCollector<T>,
and returns an implementation of a Flow
which invokes our suspending function block when the collect function is called. The SafeCollector
wrapper is to ensure that the coroutineContext
where the Flow
was collected is the same context used to emit the items. An exception gets thrown if you try to change context while emitting items. For example the following code will throw an IllegalStateException
:
Since the suspending lambda we are passing to the flow
function is an extension on FlowCollector
, we are able to call emit
within this lambda to send items to the collector. For example:
flowViaChannel
is an alternative builder that allows you create a flow from items sent to a channel. This takes an optional buffer size and a suspending lambda that takes a SendChannel
as a parameter. Items sent to this channel are going to be emitted as a Flow
when a terminal operator is invoked.
flowOn
and flowWith
are operators that can be used to switch the flow’s context similar what we do with subscribeOn
and observeOn
in RxJava
. While flowOn
is used to change the context of the upstream, i.e everything before it:
flowWith
switches the context where the code within its body is executed. It takes a Dispatcher
, an optional buffer size and a function which is extension function on a Flow
.
Within this extension function, you define the set of operations you want to execute in the new context. For example:
There is also a builder function called flowOf
that allows you to convert a vararg
to a flow that emits each vararg item.
An asFlow
extension function is also defined on the following types, that converts from these types to a Flow
:
(() -> T).asFlow() (suspend () -> T).asFlow()Iterable<T>.asFlow()Iterator<T>.asFlow() Sequence<T>.asFlow() Array<T>.asFlow() IntArray<T>.asFlow() LongArray<T>.asFlow()IntRange<T>.asFlow()LongRange<T>.asFlow()
Given these extension functions above, we can create a Flow
in the following ways:
Operators
Asides the context switching operators flowOn
and flowWith
which I already introduced above, Flow
comes with a host of other operators most of which are the familiar and expected operators on a stream type. Some of these operators are listed below:
map
flatMapConcat
flatMapMerge
filter
filterNot
combineLatest
zip
distinctUntilChanged
drop
take
takeWhile
dropWhile
Flow
is not currently as operator rich as RxJava
but as at when this post was written, there are 27 operators on the Flow
type. Work is currently still on going on the project and some more operators might be added. On the other hand, the simplistic nature of the library makes it quite easy to write your own operators if the need for that arises. For example, I think an implementation of a mergeWith
operator might look like this:
Terminals
Terminal operators are the way we trigger the execution of a Flow
and consumption of the emitted items. As said earlier, a Flow
represents a cold stream which means it is always in a sort of dormant state until a terminal operator is invoked, which causes a trigger on the flow’s collect
function, which in turn triggers the execution of the flow.
Central to all other Flow
terminal operators is another collect
function declared as an extension function on the Flow
type. This collect
extension function takes a suspending lambda that gets passed each item emitted from the flow. We can do whatever processing we want on the items within this lambda.
As shown in the image above, calling the collect
terminal operator with a suspending lambda callback, initiates a call to the Flow
‘s collect
function passing an implementation of a FlowCollector
that forwards the emitted items to our lambda.
A sample usage is shown below
If you are expecting a single item from the Flow
, use the Flow<T>.single()
terminal operator. Flow<T>.singleOrNull()
returns either a single item or null. These operators are somewhat synonymous to the Single
and Maybe
types in RxJava
.
There is a count
terminal operator and a second variant that takes a predicate, that returns the number of emitted items and the number of items that satisfies the predicate respectively.
To accumulate emitted items, you have the reduce
and fold
operators. These two terminal operators are similar in the way they work. They both take an accumulator function but fold
also take an initial value that the accumulator begins with. At the end, they both return a single value which is the result of accumulating all emitted items using the accumulator suspending function.
Finally, to get your emitted items as a list or as a set, you have the toList
and toSet
terminal operators which returns a List<T>
and Set<T>
respectively.
Error Handling
Use onErrorReturn
and onErrorCollect
to recover from an exception thrown by an upstream flow. onErrorReturn
returns a single item when an error occurs while onErrorCollect
can be used to switch to another flow entirely. For example:
Both operators take an optional predicate parameter if you only want to recover from the error if the error satisfies a particular condition. The default value for this predicate returns true for all errors.
With the retry
operator, you can retry collecting a flow N number of times or retry the collection when an optional predicate is satisfied. It should be noted that retry only kicks in when the error occurs during collection and not while the downstream is processing emitted items.
In conclusion, a lot of work has already been done on this library by the awesome folks at JetBrains. More changes are expected before the final release but what has already been done is pretty impressive. I look forward to how this project develops in the future.
For more information and to follow the progress of this awesome library, check out the project on Github, also check out this article on cold streams by Roman Elizarov.