ProAndroidDev

The latest posts from Android Professionals and Google Developer Experts.

Follow publication

Android Unidirectional Data Flow — Kotlin Flow vs. RxJava

The network, database, and view states

Adam Hurwitz
ProAndroidDev
Published in
12 min readMay 19, 2020

One of the many great things about the Unidirectional Data Flow (UDF) to organize an app’s logic is that it works with any reactive programming pattern, whether it be Kotlin coroutines Flow (Flow), ReactiveX (Rx) based RxJava/Kotlin, or LiveData. Kotlin coroutines version 1.0 was released at the end of 2018 and anecdotally has quickly been gaining adoption, alongside functionality. However, a large portion of apps have been built pre-late 2018 and rely on Rx. This makes both Flow and Rx important tools to be equipped with when working on a diverse set of codebases.

Spoiler — Kotlin Flow requires less setup than RxJava, manages the lifecycle by default, handles threading cleanly, avoids nesting, allows customization with extensions, and is relatively easy to use.

CryptoTweets sample code

Previous

Overview

Recap

Additional setups required

Data layer

View states

Summary — Advantages of Kotlin Flow and RxJava

Recap

UDF Structure

🏗 UDF Structure

  1. The view creates events initiated by the user or Android lifecycle.
  2. The ViewModel requests data from the repository layer.
  3. The repository returns data from the network/database.
  4. The ViewModel creates/updates the view state, comprising the UI, and effects, which are one-time events like navigation or error-handling.
  5. The view observes the view states/effect changes.

💪🏻 UDF Strengths

  • Linear flow of app logic with events initiating the data returned
  • Control UI & async events
  • Pre-defined test structure
  • Quicker debugging

Additional setups required

Photo by Jeremy Bishop on Unsplash

In addition to implementing the core libraries for Retrofit, Room, and the Paging tools below, additional setup is required for Flow and Rx.

Core functionality

🌊️️️ Flow — Add kotlinx.coroutines

  • When creating a new project in Android Studio select Kotlin as the language under the Configure your project step.
  • For creating/updating the view state and effects, in addition to Flow, the kotlinx.coroutines library, starting with release 1.3.6, provides StateFlow and MutableStateFlow similar to LiveData’s immutable and mutable variants.

⛓️ Rx — Requires two additional libraries

  • RxKotlin library for standard Rx functionality including RxJava
  • RxAndroid library for Android-specific functionality like using the main thread with AndroidSchedulers.MainThread

Retrofit network requests

🌊 ️️Flow — Works out of the box

Rx — Add RxJava adapter to Retrofit.

  • There are different RxJava adapter libraries based on the RxJava version number, as outlined in this StackOverflow post.

Room SQL database and Paging library

️🌊 Flow — Requires two additional libraries

⛓️ Rx — Requires two additional libraries

See: Room 🔗 RxJava

Additional setups summary

In addition to the normal Retrofit, Room, and Paging setups, Flow requires fewer libraries compared to Rx.

build.gradle(:someModule)

🌊 Flow

⛓️ Rx

Data layer — Retrofit network requests

Photo by Scott Webb on Unsplash

Being able to handle both one-time requests and constant streams of information from the network are key capabilities of reactive programming.

Request return type

The type required for Flow, of a List, is simpler than using an Observable. Flow’s functions are slightly more complicated requiring the suspend modifier. However, this modifier allows Retrofit to handle background threading by default, as opposed to explicitly defining the threading when the Flow is launched. Also, returning a list is useful to handle information more easily as we’ll also see in the Check response status section below.

FeedService.kt

🌊 Flow

⛓️ Rx

Backpressure

The sample in CryptoTweets showcases a simple one-time request. However, it is worth mentioning more advanced cases in which Flow and Rx have the ability to handle constant streams of data that may lead to backpressure. Backpressure is when there is an ongoing and incoming stream of data that needs to be processed. For instance, this can be from a web-socket type network request that maintains a constant connection. Managing backpressure requires processing the incoming data without causing issues such as slowing down the UI, crashing the app, or losing important information.

🌊 Flow

Coroutines handle backpressure by default with the flow/collect pattern by managing data streams synchronously. Flow by default performs sequentially. The code runs one emission from within the source flow, then one collection is run. This process continues until all of the data is processed.

You would not see any code that explicitly handles back-pressure, because it automatically happens behind the scenes due to the support for suspension that is provided by Kotlin compiler. — Roman Elizarov (Kotlin Team Lead), Reactive Streams and Kotlin Flows

Coroutines offer further strategies to handle backpressure and improve the speed of which data is processed.

  • buffer — Runs the source data concurrently. If the collector is slow the data inside the source can still be pre-processed or manipulated so that when the collector is ready the data can be returned faster than if the source data was processed sequentially by default.
  • conflate —This speeds up returning data by only returning the latest value, thus not returning all of the data from the source.
  • collectLatest — Slow collectors are cancelled and restarted each time new data is emitted to increase the speed of data returned.

⛓️ Rx

By default, an Observable will not manage backpressure. If there is an existing source emitting data, and then is later observed, the data prior to the observation/subscription will not be processed. Instead, Rx handles backpressure using a Flowable which defines the strategy to handle backpressure.

  • BackpressureStrategy.BUFFER — Allows the source to hold the data until the subscriber can consume it, similar to what coroutines do by default
  • BackpressureStrategy.DROP — Discards events that cannot be consumed in time by the subscriber.
  • BackpressureStrategy.LATEST — Overwrites previous data with new data if the subscriber is behind. This is most similar to coroutines’ conflate, because conflate skips intermediate values, but does not explicitly remove or drop them.

See: RxJava 2 — Flowable, Baeldung

Check response status

After receiving information from a network request, doing a quick check on the status of the information is often required.

🌊 Flow

Because the threading and lifecycle are defined at the initialization of the flow as shown in the Process data section below, the tweetsResponse may simply be checked for emptiness.

FeedRepository.kt

⛓ ️Rx

This requires subscribing to to the data and defining the threading in order to do a basic check, like whether the data is empty.

  1. subscribeOn — The thread that will launch the process
  2. observeOn — The thread the data will be returned on.
  3. subscribe — Returns the data. This pattern also creates a layer of nested code.

FeedRepository.kt

Process data

🌊 Flow

When the Flow is initialized the lifecycle is defined with launchIn. The viewModelScope lifecycle inherently defines the Dispatchers.Main.immediate thread for data to be returned on. The lifecycle is automatically managed by the Android ViewModel using viewModelScope, and will be destroyed when the ViewModel is destroyed. Returning data on the main thread is appropriate because the the view state and UI are being populated/updated.

The logic within initFeed performs Retrofit and Room database tasks which if configured properly handle background threading by default. In other cases, flowOn needs to be defined explicitly before launchIn. i.e. flowOn(Dispatchers.IO) since the IO thread is appropriate for network and database tasks.

FeedViewModel.kt

Side note — The difference between Dispatchers.Main.immediate and Dispatchers.Main, as Craig Russell explains in Launching a Kotlin Coroutine for immediate execution on the Main thread, is Dispatchers.Main requires a check first to ensure it is on the main thread, whereas Dispatchers.immediate assumes it is on the main thread and performs right away.

⛓️ Rx

  1. The threading is defined with the same methods as in Check response status above.
  2. CompositeDisposable — Manages the Observable lifecycle. By adding the Observable to a CompositeDisposable, all of the created Observables can be cleared in the view activity/fragment onDestroy method by calling the public method in the ViewModel clearAndDisposeDisposables.

FeedViewModel.kt

Data layer — Room database with Paging library

Photo by David Clode on Unsplash

Inserting and querying data from the DAO

🌊 Flow

Both use a DataSource.Factory in the data access object in order to insert and query data from the Room SQL database. Flow utilizes the suspend modifier in order for Room to handle background threading by default. Again, this is opposed to explicitly defining the threading when the Flow is launched.

Rx

FeedDao.kt

Creating a PagedList

🌊 Flow

  1. getAllTweets — The FeedDao queries for data and the extension function toLiveData builds LiveData of type PagedList. toLiveData handles threading on an IO background thread.
  2. asFlow — Converts this into a Flow of type PagedList bringing the data back onto a Main thread.
  3. tweetsQuery — Data can be collected using collect and emitted using emit to the ViewModel which initiated the repository data request.

Additional threading management could be added using withContext if further processing is required on other threads. Otherwise, the threading has already been defined when this method was initiated in the ViewModel above.

FeedRepository.kt

⛓️ Rx

  1. The process here is similar, but using toObservable extension function to convert the query directly to an observable instead of to LiveData first.
  2. Threading is managed twice, first off in the ViewModel when building the view states and effects and initializing the repository request, and secondly here in the repository when processing the data.

FeedRepository.kt

View states — States + effects

Photo by Vino Li on Unsplash

Regardless of the reactive implementation for the view state, (Flow, Rx, LiveData, etc.), there are both view states and effects to update in the ViewModel that create the user experience observed in the view, (activity/fragment).

  • View states — Final persisted data of a screen that is displayed continuously to the user
  • View effects — One-time occurrences that don’t persist such as errors, dialogs, and navigation actions

View state

🌊 Flow

  1. lifecycleScopeHandles threading by default with lifecycleScope, returning the UI data onto the Dispatchers.Main.immediate thread the same as viewModelScope above.
  2. MutableStateFlow — Sets and updates the view state data in the ViewModel with a private mutable data class.
  3. Flow— Updates the public non-mutable view state that is observed by the view, in this case a fragment. The view effects follow the same pattern.
  4. filterNotNull — Allows the view state to be initialized as null when the ViewModel is created without emitting null values to the view.

FeedViewState.kt

FeedViewModel.kt

FeedFragment.kt

⛓️ Rx

  1. BehaviorSubject — Rx Subjects are both an Observer and an Observable, allowing their data to be mutated and observed. BehaviorSubject is a great fit for the view state and effects because the most recent value is emitted to the observers, and then any new values added during the lifecycle of the subscription are also emitted. The original design of the Rx view state in this sample used a ReplaySubject. All values are added to a ReplaySubject are emitted to its’ subscribers. With ReplaySubject, past values could potentially update the view state that are outdated. The Subject type used depends case-by-case.
  2. All observers in the fragment must be cleared too in addition to clearing and disposing of the CompositeDisposable in the ViewModel above. With LiveData and Flow, the lifecycle is handled by default. This is a tradeoff if deciding between LiveData and Rx because LiveData handles lifecycle by default, whereas Rx allows specific emission strategies by default.

See: Exploring RxJava in Android — Different types of Subjects, Anitaa Murthy

FeedViewState.kt

FeedViewModel.kt

FeedFragment.kt

View effects

The issue with any reactive programming pattern for one-time events is that they will be re-observed by the subscriber after the initial one-time event has been emitted. Jose Alcérreca describes the SingleLiveEvent case in the context of LiveData. Both the issue and solution based on an event wrapper pattern can be applied to any reactive pattern.

For reach reactive pattern the type contained within the reactive object will be wrapped in an event. You can test this out by subscribing to a one-time event multiple times after emitting it once in the ViewModel and logging the value before and after implementing the Event.

Event.kt

🌊 Flow

The view effect is similar to the view state with the only difference being the Event wrapper class.

FeedViewEffect.kt

FeedViewModel.kt

It is easy to create customized utility functions with Kotlin. The one-time event is a perfect use case. Instead of manually checking in the view whether the event has already been emitted with getContentIfNotHandled from the Event class, this is performed automatically in a custom utility function onEachEvent, customizing the coroutines’ Transform.kt onEach extension function.

OnEachEvent.kt

FeedFragment.kt

⛓️ Rx

FeedViewEffect.kt

The same Event class is used as the BehaviorSubject type.

FeedViewModel.kt

FeedFragment.kt

getContentIfNotHandled manually checks to see if the event has already been observed.

subscribe is written in Java and cannot be created as easily as the onEachEvent extension function for Kotlin Flow. I’m open to pull requests if you’d like to customize subscribe. Good luck.

Morgan Freeman Good Luck GIF

⚖️ Summary — Advantages of Kotlin Flow and RxJava

Photo by 贝莉儿 DANIST on Unsplash

In cases working with legacy code or teams not yet onboard with Kotlin Flow, it is important to be able to write well structured code with Rx so that all of the developers working on common code adopt a similar strategy. Given the opportunity to create a new app/feature, Kotlin Flow has a short learning curve, built-in integrations with Android components and widely adopted open-source libraries, and expanding it’s use cases rapidly.

🌊 Flow

  • 🏎️ Quick setup — Requires two fewer libraries compared to Rx.
  • ♻️ Lifecycle management — Inherently controls the lifecycle of the operation upon launch.
  • 🧵 Threading management — Can be handled in one place when the flow is launched rather than each time data is observed. Additional customization can be added if needed. This results in cleaner and simpler code.
  • 👻 Avoid nesting nightmares — The callback style pattern is harder to work with and creates the need to initialize instance variables within a function to retrieve information from within nested logic, like when creating the PagedList above.
  • 🎨 Customization — The core classes, methods, and inherited classes and interfaces are readable and easy to understand the existing features in order to edit or add functionality with extension functions.
  • 🚦 Ease-of-use — It is easy to learn new topics via reading the good ol’ documentation. The docs provide interactive examples for almost every topic to test, iterate, break, and understand. Does anyone truly understand the marbles diagrams in Rx? 🤔

⛓️ Rx

  • 🧑🏽‍🏭Mass adoption — Most apps built pre-2018 use Rx.
  • ⚔️️Battle tested APIs — Kotlin Flow contains many experimental APIs, indicated by@ExperimentalCoroutinesApi, which is temporary as the libraries mature.
  • ⚙️Larger selection of operators — In CryptoTweets the data stream is simple, but in more complex cases, due to the maturity of Rx, there might be more advanced operators/transformations not yet available with Kotlin coroutines and Flow. For both CryptoTweets and the beta app I’ve published to the Play Store, Coinverse, I have not run into such cases where coroutines have not been able to do the job.

I’d love to hear in the comments below if there are operators you’ve come across for RxJava that are not yet available for Kotlin coroutines/Kotlin Flow.

Thus far, Andrey Bolduzev has pointed out Rx’s distinct operator for filtering only unique values for an entire subscription’s stream as outlined here.

Game Of Thrones Good Luck GIF

Thank you to Roman Elizarov for the feedback regarding Kotlin StateFlow and Andres Rubiano Del Chiaro with RxJava! 🙏🏻 Please leave claps if you liked the above. The longer the clap button is pressed the more claps. 👏🏻

I’m Adam Hurwitz, writer of code and more. Follow me on Medium.

Sign up to discover human stories that deepen your understanding of the world.

Free

Distraction-free reading. No ads.

Organize your knowledge with lists and highlights.

Tell your story. Find your audience.

Membership

Read member-only stories

Support writers you read most

Earn money for your writing

Listen to audio narrations

Read offline with the Medium app

Published in ProAndroidDev

The latest posts from Android Professionals and Google Developer Experts.

Written by Adam Hurwitz

An account about nothing | Researcher and product consultant

Responses (3)

Write a response