image by Patrick Tomasso, Unslpash

Kotlin SharedFlow or: How I learned to stop using RxJava and love the Flow

Aleksandra Krzemień
ProAndroidDev
Published in
6 min readMay 15, 2021

--

Coroutines are great. Period. When they were introduced along with Kotlin 1.3 we immediately knew that they were going to be a game changer. And they are indeed. Easy to learn. Easy to understand. And so easy to use. When coroutines 1.0.0 were released, many anticipated that it would be the end for RxJava on Android.

It turned out that despite coroutines being great, RxJava was still being picked for many new projects. Why? Probably one of the reasons was that it was well known, sure. But there was also a more important reason: coroutines still couldn’t be used in some use cases where RxJava could.

RxJava is not only Single

In the majority of the apps RxJava was used mostly for API requests handling. We all know that — make a REST API call with Retrofit, return RxJava Single, apply proper schedulers, and then map the response. In this scenario, coroutines are a great fit. We just need to change our endpoint methods to suspending ones, and we’re more or less ready to go. But of course, RxJava is much more than that, right? The real strength of reactive programming is processing of ongoing data streams.

Example 1: we have a few-step data upload process in the app (done in a background thread), and the update at each step should cause realtime update of the UI. Seems like a great case for good old Observable and its onNext(). This scenario could not be handled with plain coroutines, as suspend functions can return only a single value. So we lacked a proper tool. It all changed with introduction of coroutines 1.2.0, when we were given Kotlin Flow. A mighty Kotlin Flow that handled (almost) all cases that previously could be handled only with RxJava, and what’s the most important — it was handling them using coroutines.

If you have any experience with both RxJava and coroutines, switching to Flows should be really seamless, as their syntax and logic is using both aforementioned concepts. Let’s see the example code for Example1.

Using RxJava
And using Flow

Now we just need to subscribe() our Observable or collect() our Flow, and we’ve got all we need. Easy peasy. This way coroutines made another step to replace RxJava. However, that wasn’t the last step.

You’re hot then you’re cold

For anyone working with RxJava a bit more extensively, the concept of cold/hot streams is well known. Streams created by Observable.create() or Flowable.just() are cold, which means that they start executing the code and emitting values only when something subscribes to them. This is the case that can be also handled using flow{} or flowOf() — flows created with these methods are called cold flows (no surprise here).

The last big RxJava use case that coroutines were still missing was handling hot flows. In reactive programming hot streams are the ones that don’t have to be observed by anything to emit values. This makes them a perfect candidate eg. for handling BLE connection state — app knows about connection state changes at all times, and informs observers if there are any, or as soon as they arrive (Example 2). Hot streams in RxJava can be created using eg. PublishSubject and BehaviorSubject. Here is how we can use them to implement Example2:

For a long time this part of functionality was missing in stable coroutines API, and that changed with release of coroutines 1.4.0 in October 2020. But let’s take a step back. Even before Kotlin Flow was released, we heard of a concept called channels. It was designed to solve the Producer-Consumer problem — of course using coroutines. Channelinterface, which is offering suspenable send() and receive() methods, seemed to be quite straightforward but in reality is a bit more complex. It is using independent coroutines on both sides of the communication that introduces concurrency which has to be handled by us, and may lead to some issues with concurrent access to the same resources. It also wasn’t helpful that channels API was still not fully stable — there were (and still are) many experimental/preview methods there. Lucky for us, the Kotlin team came up with a concept of SharedFlow and StateFlow that doesn’t mess with multiple coroutines and makes our data flow much cleaner. If you want to read more about channels-flow relation, take a look at this great article by Roman Elizarov.

Now to the best part. SharedFlow is an equivalent of RxJava’s PublishSubject. It allows us to create hot flows and specify strategies for handling backpressure and replay. StateFlow is a special case of SharedFlow which is an equivalent of RxJava’s BehaviorSubject. It creates a hot flow that persists its latest value — which makes it a perfect candidate for handling BLE connection state from Example2.

It’s worth noticing that both Shared and State flows can be used in mutable form and be exposed as immutables — this concept is well known for anyone using LiveData. In RxJava we had to hide BehaviorSubject behind plain Observable and expose current state separately. With flows, the immutable form of StateFlow still allows us to get its latest value.

To use SharedFlow and StateFlow correctly we need to remember a few things:

  • StateFlow is conflated, which means that if we update its value with a new value that is equal to the previous one, the update will not be propagated.
  • SharedFlow needs to have a proper replay/buffer configuration. In most cases we just need to make sure that our producer will not get suspended if there’s no consumer, eg. by setting replay or buffer capacity to a positive value or setting bufferOverflow to something else than SUSPEND.
  • StateFlow and SharedFlow will never complete, so you cannot depend on their onCompletionCallback, and need to remember that collecting them will never finish normally (neither will the coroutine in which we called their collect).

Ok, cool, but do we even need that?

For me SharedFlow and StateFlow were real game changers and only when they were introduced I decided that I can fully quit using RxJava. What is more, recently StateFlow also received alpha support for working with data binding. And what is great is that using StateFlow in data binding is lifecycle aware — just like LiveData! No more updating UI when it’s not ready ❤ However, you need to remember that collecting SharedFlow in the Fragment using lifecycleScope.launch{} is not lifecycle aware — you need to use launchWhenStarted or cancel the job when the app goes to background. This topic has been well discussed in this article.

Now, some of you may say:

“Why do we even need SharedFlow? Most of the described use cases can be handled with LiveData!”

And then I’ll ask:

“Have you ever heard of clean architecture?”

Sure, when you need to observe changes from ViewModel in the Fragment, LiveData is absolutely enough, but sometimes you need to observe events in different modules, or in general — somewhere behind the abstraction layer. And LiveData unfortunately won’t work in this case because it is an Android class, and our domain module (or in general — base abstraction layer) should be pure Kotlin. This is where SharedFlow purely shines, because it is a Kotlin class. We can expose it from our database or API without tight coupling our domain code to Android classes. Sounds like a really nice approach 😀 What is more, it seems that SharedFlow can be used as an alternative solution for the SingleLiveEvent problem that needs a special approach when using LiveData.

September 2022 Edit: since this article was written, many different articles were created that tried to utilize Flow for handling SingleLiveEvent case. One of the best I’ve read is this one written by Michael Ferguson. It recommends using Channels rather than hot flows to handle your data streams. However, recently Google guidelines changed, and now it is advised to start mapping one-time-events to be a part of state which can be then observed by the UI. To be honest, I still see some valid use cases for good old SingleLiveEvent, but to be up to date with the guidelines you may want to read this article by Manuel Vivo.

I’ve been using SharedFlow and StateFlow in my latest project for about 7 months now, and they’re really fun to use!

Have you already tried SharedFlow or StateFlow? What are your thoughts about them?

I know that I finally can stop using RxJava and just love the Flow 😀

Originally published at https://angrynerds.co/blog/ on April 29, 2021.

--

--