Kotlin Flow benefits over RxJava
Lately I’ve been implementing a new project using Kotlin Asynchronous Flows instead of RxJava as I was used to. Why? I saw this talk from the past KotlinConf 2019 and I wanted to give it a try. In this story I will share the benefits Roman Elizarov explained in his talk and also some usages in comparison to the Rx equivalents.
First of all, I have to say that if you are used to RxJava you can learn how to implement flows pretty fast. The concepts are the same and they share lot of operator names. Also, if you know how to implement some behaviors in Rx you can look for the equivalent with flows.
Simplicity is the first adjective that comes to my mind to describe the framework
However, if you want to implement a Subject
related pattern you will have to use Channels
for now. It is not a problem, but you will end up having a lot of ExperimentalCoroutinesApi
annotations in your project. The good thing is they announced that they are going to implement a way to catch and share flows in StateFlow
(check here) so, hopefully, this will be fixed soon.
Simplicity
This is the first adjective that comes to my mind to describe the framework. Creating a Flow instance is super simple:
flow { emit("whatever") }
That’s it. You don’t have to deal with different methods to create a stream as we have in Rx. You don’t have to think if you have to use just
, create,
defer
or whichever of the multiple operators they have.
Also, flows are always cold observables (If you don’t know the difference between a cold and a hot observable you can read it here). So, you just create a flow and at the moment there is someone observing, it starts to emit.
It's not only more simple to create observables but also to transform them. In Rx we have operators to work with synchronous and asynchronous operations. For instance, map
is for synchronous operations, on the other hand, flatMap
is for asynchronous ones. Because of the fact that all flow operators accept a suspend
function, all of them are prepared for asynchronous operations. We don’t need both a map
and a flatMap
operator, just a map
one. Another example is the filter
method which is synchronous in Rx, while the equivalent flow one is asynchronous.
You can think that less operators means less power but, in my opinion, this is not the case because of the fact that…
Flows are composable
You can end up having the same behavior as an Rx operator just composing suspend methods. For instance, interval
is an Rx operator that emits a Long
every X time ( Observable.interval(1, TimeUnit.Seconds)
), you can implement it by composing:
And so on. You can end up having the same behaviors that you obtain through an Rx operator by composition. In my opinion this is good because you don’t have to know all the operators but just the way to implement them.
Another example: In Rx we have onErrorReturn
and onErrorResumeNext
to recover from an error and, in flow, we just have the method catch
instead. In Rx, you have to decide which operator to use depending on if you want to return an Observable
or a value. With catch
, you can compose how you want to recover from an error by doing an asynchronous operation or just returning a default value, etc.
flowReturningAnError.catch { emit(defaultValue) }
Backpressure handling
According to the RxJava docs:
In RxJava it is not difficult to get into a situation in which an Observable is emitting items more rapidly than an operator or subscriber can consume them. This presents the problem of what to do with such a growing backlog of unconsumed items.
Rx offers a whole page of the documentation to deal with backpressure. It offers multiple recommendations that help us to deal with it. A flow doesn’t have that problem. If the collector is doing a slower operation that takes more time to complete than the one that is emitting then the emitter waits until the suspended method is done to continue producing values. This is because of the nature of suspend functions and the way they are implemented. Flows are based on those functions, so they have to wait until each one is finished.
Context preservation
In RxJava, when you set up the Scheduler
where an Observable
emits items through the subscribeOn
or observeOn
operators and in the case that you set up just one of them then the whole stream will be executed on that Scheduler
(to know more about this topic you can read more about it here). For instance, if in one part of your code, you set up an Observable
this way:
Then, the second part will be executed in the Schedulers.io()
even if the two pieces of code are in different files, classes or what have you. So, the client may not be aware of it.
On the contrary, flows have context preservation. The equivalent of the one above:
The second part will be executed in the context of the runBlocking
coroutine scope instead of in Dispatchers.IO
. Therefore, if we are using an already implemented class, api, etc., we are protected against making a mistake setting up the context where our Flow has to be executed. With Rx we don’t have such protection.
In addition, I would like to add a couple of things. First,collect
is a suspend function, so it has to be executed inside a coroutine. Second, if we don’t specify the Dispatcher
, then the one from where collect
is called will be used.
Lifetime
Another main difference between Rx and Flow is the way their lifetime has to be managed. In Rx it is managed by a Disposable
and whenever you want to remove the subscription you have to clear that disposable or add it to a CompositeDisposable
to clear it together with other subscriptions.
As we see in the previous snippets, with Rx we obtain a standalone object that we can forget to dispose whenever is necessary.
On the other hand, Flows have to be launched in a Job
inside a CoroutineScope
and the Job
instance is returned where we launch the coroutine. It is harder for the developer to forget to manage the lifetime because, usually, the coroutine has to be launched inside a scope that is managed by the framework to be cleaned up. For instance:
Those scopes cancel the coroutine in case that the ViewModel
or the Fragment
are finished. Even if we use our own CoroutineScope
we know we have to deal with it, it is not and standalone object returned that we have to remember to manage.
Bonus points
- According to this github project Flow is a little bit faster than Rx
- You don’t need other external libraries than the
kotlinx-coroutines-core
one, the stable version of Flow was released in the1.3.0
version. - It has interoperability with Rx, check it here
Summarizing, I think it is a good point to start using Flow instead of Rx so we can start getting used to it. Also, the stable version was released in the 1.3.0
version of the kotlin-coroutines-core
so, if you don’t mind having to deal with ConflatedBroadcastChannel
until they release the StateFlow
, you should give it a try!
Thanks for reading! 😻 😻