
Loading entities from database and server API by using RxJava
Your app may have some repositories to handle requests of database and server API, but did you ever had trouble to describe this process by using RxJava 2? I used “describe” because RxJava, as a functional programming tool, meant to be non-imperative, which makes it a little bit harder to manage, but after you achieve the solution you know your solution is more human readable, elegant and transportable to other platforms because you wrote it in the universal language of mathematics(Category theory, morphism, functor, monad and etc.) and you can see the beauty of it.
Old Way
In this manner, Mert Şimşek wrote a post here. The main function of his solution is this:
But in his scenario, the Observable
become completed after the response of the network, which is not what we want as the Room database provides Flowable
s that doesn’t complete to deliver more updates of the data. Here we want to support this advantage of keeping the pipe open and delivering updated data.
The Scenario
We want to load data from database, then after the first response of the database we need to request to the API and in the end we need to keep the pipe open for more possible updates of the data. To achieving this we just need to avoid to call onComplete
method of observers. Having this in mind and knowing that the stream from the database is not completing, we would always be emitting updates from the database, including when we fetch from API. In order to achieve that, when we get an API response, instead of propagating this update directly, we will persist it and let the database be responsible for the emit. Additionally, after the response of database, we need to know there is a work on progress, so UI shows a loading until the response of the API is coming back.
Our Way
First of all, we assume every streams are Flowable
s instead of Observable
s. This is a proper assumption because the Room database cannot emit an Observable
, plus in this way we have the advantage of handling backpressure by Flowable
. Second, for this scenario we need to define ResultState
class as follows.
Here we assume the Loading
state is emitted by the returning Flowable
when the response of database is arrived, then the UI knows to keep showing a loading. After the response of server API is coming back, we persist it in the database and the database emits the new update, which we have to wrap it into a Success
state, so UI knows the loading is finished. After that if by any unknown process the data in the database gets updated the returning Flowable
emits Success
state with updated data. While we do these processes if any error happened the Flowable
emits Error
state with the last available data, which was cached. We keep the last data in Error
state because LiveData
can cache just one value that emits through it and UI needs every data to restore its state.
The BaseRepository
In clean architecture of our app, each of repositories in the data layer is responsible for one Entity in the domain layer as follow.
You can see in this BaseRepository
class that we have a method named perform
which is responsible for loading data from database and network. But you can see in the arguments of perform
method that database is giving us a list of entities, which in the first glance looks like a problem, but after reading Room and the Mystery of the Lost Events post you’ll understand that for avoiding lost of events we need to use list of entities for just one entity(tip: you can think of this list just like an Optional<T>
, which unfortunately is just supported from Android API 24, so we cannot use it in our base class.)
The overall behaviour of this function is tested here, and we’ll come back to it soon.
The perform
method starts with val cachedData = AtomicReference<D?>()
line. As you know, here we have a Flowable
and a Single
in the arguments of this method, so we have to consider multi-threaded streams, then we use AtomicReference
to support it. Also cachedData
variable is responsible for caching the emitted value through the database Flowable
.
The next line is
val processor = PublishProcessor.create<ResultState<D>>()
which we’ll use it as Flowable
without an end. As we described the scenario that we want to support, we need to have a Flowable
that never calls the onComplete
method of its observers, so we’ll use the processor
for this purpose.
val flowable = databaseFlowable.doOnNext {
cachedData.set(it.firstOrNull())
}.share()
On the above line we cache the emitted values of the database in cachedData
, then we create a ConnectableFlowable
by using the share
operator, so this makes us sure in the two calls of Flowable.merge
the lambda function of doOnNext
would invoke only once. To understand share
operator better you can see its related document here.
If we ignore the flatMap
s and onErrorResumeNext
method then the returning value of perform
would be
return Flowable.merge(flowable.take(1), flowable.skip(1))
This line just divides the steam to the first emitted value and the rest of them, then after treating them differently, merge them to one stream again. Here are the documentation of merge, take and skip operators.
.onErrorResumeNext{
concatJustFlowable(ResultState.Error(...), processor)
}
Above line converts all the errors in this stream to ResultState.Error
, so it makes us sure that onError
of observers of the returning Flowable
would never get called. It’s important for us because after calling onError
the stream would get closed, which is bad as long as we want to emit the updates of data to the UI. Here we use the combination of .onErrorResumeNext{ concatJustFlowable(..., processor)}
to exactly avoid the completion of the returning Flowable
.
If database emits its first value then the following code is running.
if (it.isEmpty()) {
handleNetSingle(netSingle, persist, cachedData) } else {
concatJustFlowable(
ResultState.Loading(it.first()),
handleNetSingle(netSingle, persist, cachedData)
)
}
This part of the code just checks if the database doesn’t have the value then just request to the server API, without emitting the Loading
state. Otherwise, if database found a value then emits Loading
state and request to the server API for updated value.
However, for the reset of the stream we just send Success
state by the following code, if the database is actually found an update value, so the list is not empty.
if (it.isNotEmpty()) {
concatJustFlowable(ResultState.Success(it.first()), processor)
} else {
processor
}
Also we have a method named concatJustFlowable
which is responsible for emitting a value and then let a Flowable
emits its values. The other method is handleNetSingle
which is responsible for creating a Flowable
from network Single
and persist the data. In case of error, this method needs to wrap error with
.onErrorReturn {
ResultState.Error(it, cachedData)
}
We need to handle errors of network and API here to avoid handling errors with Flowable.merge
method, which breaks its functionality.
Testing
As mentioned before, the test methods are here, so let’s briefly explain how they work. In most of these tests we have a database processor to mock the database behaviour like this
val database = PublishProcessor.create<List<EntityTest>>()
and also we have a subject to mimic the server API like this
val net = PublishSubject.create<EntityTest>()
Additionally, in most of these tests we have a persist
function like this
val persist = { d: EntityTest -> database.onNext(listOf(d)) }
which is responsible to emit the persisted entity to the database Flowable
, then after subscribing to the returning Flowable
like this
val testSubscriber = usecaseFlowable.test()
we start to emit database and network values and test what’s happening. In most of cases we want to assert the Flowable
is not competed or propagated a Throwable
.
Finally, I hope with this scenario you and users of your app feel better and have better experiences. In the end, I want to thank my team, including Reza Bigdeli, Dr. jacky and Elmira farahani, for their endeavour to achieve this result.