Eager vs. Lazy in RxJava

Over the last several years, RxJava has been a staple of Android software development. RxJava has a very high learning curve, and part of that is because you are really learning two different things at once. The first is the application of operators to a stream of values. This includes doing the normal routine tasks in RxJava that most people are used to.
Observable.just(1, 2, 3, 4, 5)
.map { it + 5 }
.subscribe(::println)// Prints 6, 7, 8, 9, 10
The other half of the RxJava equation is that of the asynchronous threading model. Herein lies the need to understand the difference between eager and lazy evaluation, and to know a bit about how RxJava operates under the hood. In this post, I would like to build up an example of eager vs. lazy evaluation, and show why we might want to double check how RxJava works in different circumstances.
Definitions
First, it is best to define some terms. Eager evaluation means that when you assign an expression to a variable, the expression is evaluated immediately and the variable’s value is set. This leads to very straightforward, deterministic behavior in the software we write. This is the default in most programming languages such as Java, Kotlin, or C. On the other hand, lazy evaluation only evaluates at the last possible moment before the value is needed. If you never access the value, the code determining the value is never run. This is the default in languages like Haskell, and many other functional programming languages. With respect to RxJava, it is sufficient to say that eager operators try to do things as soon as possible, while lazy operators tend to wait until the very last moment.
Minimizing Requests
Let’s imagine a situation where we have multiple requests to a network service. While we are processing that request, we want to ignore any new requests. Such cases could be seen in the downloading of a configuration feed, uploading of some sort of data, etc.
Our first intuition might be to utilize a debounce
. However, a debounce
will wait the full length of the given timeout before emitting any data. After this timeout, it will emit the most recent item it received. This is the opposite of what we want, as we would prefer our network request to start as soon as possible.
One approach to solving this problem is to utilize a Flowable
and an appropriate backpressure strategy. We can “fake” 5 button presses spaced 10ms apart utilizing an interval
followed up with a take
. We also add a strategy for our backpressure, via onBackpressureDrop
.
Backpressure occurs when the downstream (stuff below the call to onBackpressureDrop
) can not process the values being emitted from the upstream (stuff above the call to onBackpressureDrop
) fast enough. In our case, we specified that they should be dropped. We could also Buffer
, throw an error, or something else, as defined by the BackpressureStrategy
enum.
Flowable.interval(10, TimeUnit.MILLISECONDS)
.take(5)
.onBackpressureDrop()
Now, let’s imagine we have some long-running task we want to perform on each emission. Because we are performing network IO, we’ll be sure to do this work on the IO threadpool.
fun webRequest(i: Long) = Single.create<Long> {
println("Preparing $i")
Thread.sleep(100)
it.onSuccess(i)
}.subscribeOn(Schedulers.io())
We can bring this into our stream like so:
Flowable.interval(10, TimeUnit.MILLISECONDS)
.take(5)
.onBackpressureDrop()
.flatMapSingle(::webRequest)
.subscribe { println("Got $it") }
Great! We can subscribe and away we go…
Preparing 2
Preparing 3
Preparing 0
Preparing 1
Preparing 4
Got 4
Got 2
Got 3
Got 0
Got 1
Whoops! Something’s wrong. Everything is out of order, and we processed everything, instead of dropping things one at a time. Well for one, we never actually told our stream that we only want to process a single value at a time. We can try to utilize a ResourceSubscriber
to do this, invoking its request
method. We’ll change our subscribe
call to look like this:
.subscribe(object : ResourceSubscriber<Long>() {
override fun onStart() {
println("Start")
request(1)
}
override fun onComplete() {
println("Completed")
}
override fun onNext(t: Long?) {
println("Got $t")
request(1)
}
override fun onError(t: Throwable?) {
error("This should never happen")
}
})
Ok, so now we run again…
Start
Preparing 0
Preparing 1
Preparing 2
Preparing 3
Preparing 4
Got 1
Got 0
Got 2
Got 4
Got 3
Completed
Oh no. Our output looks the exact same. So what is happening? Notice how all of the Preparing
calls are at the top of our output. This means that we’re calling each Preparing
statement before any of our output is actually processed by our onNext
override in our ResourceSubscriber
. That’s a problem, because that would imply that every time the user hits our fabricated button, we are making a network request.
There is a handy method called doOnRequest
that will help show us why we’re getting more data than expected. By inserting calls to doOnRequest
to either side of our flatMapSingle
, we can get some insight as to what is going on.
.doOnRequest { println("A: $it") }
.flatMapSingle(::webRequest)
.doOnRequest { println("B: $it") }
Our output looks like so:
Start
B: 1
A: 9223372036854775807
Preparing 2
Preparing 0
Preparing 1
Preparing 3
Preparing 4
Got 2
B: 1
Got 0
B: 1
Got 1
B: 1
Got 3
B: 1
Got 4
B: 1
Completed
Ok, that is a large number. It turns out that flatMapSingle
is very eager to get some work done. It is important to remember that every operator in RxJava creates an internal subscription. In the case of Flowables
, this subscription includes making a call to request
to get some data from the upstream. By default, flatMapSingle
just asks for as much data as possible. To restrict this, we can utilize an override to flatMapSingle
.
.flatMapSingle(::webRequest, false, 1)
Here we are telling flatMapSingle
that we only want 1
object at a time (our maxConcurrency
value). We end up with the following output:
Start
B: 1
A: 1
Preparing 0
Got 0
B: 1
Completed
Which looks a lot better, and is what we really wanted in the first place. In fact, we can simplify our example. Originally, we were trying to restrict our data load with a ResourceSubscriber
. This is not necessary. Because we have told our flatMapSingle
to only accept one item at a time, we can eliminate this subscriber and utilize a simpler subscribe
call.
Flowable.interval(10, TimeUnit.MILLISECONDS)
.take(5)
.onBackpressureDrop()
.flatMapSingle(::webRequest, false, 1)
.subscribe { println("Got $it") }
There are similar overrides for other flatMap
methods which you can utilize to get similar behaviors. The takeaway here is that you should look for opportunities to restrict the flow of data and decide on a backpressure strategy as early as possible. It will lead to less processing in the long run.
Kotlin: {} vs. ()
Another point of contention that comes up a lot with RxJava is the hours wasted to the difference between {}
and ()
when invoking operators. This next example hopes to shed some light on the difference.
Consider the function createCompletable
, which creates a Completable
that simply prints off a given label
, goes to sleep for 100ms, and then completes.
fun createCompletable(label: String) =
Completable.create {
println("Created Completable $label")
Thread.sleep(100)
it.onComplete()
}
We can construct a chain of Completable
objects utilizing andThen
. One possible valid implementation looks like so:
createCompletable("A")
.andThen { createCompletable("B") }
.subscribe()
However, when we run this, all we see is:
Created Completable A
What happened to B
? Well, we used {}
instead of ()
. In Kotlin, there is syntactic sugar that lets you move lambdas outside of a method’s ()
provided they are the last argument. Where this confusion comes in is that andThen
utilizes a SAM interface, CompletableSource
. This interface has a single method, with the signature (CompletableObserver) -> Unit
. There is nothing returned from this method, so calling arbitrary statements within it have no effect.
In our example, we are calling createCompletable("B")
. However, this is never executed, because it is never subscribed to. RxJava is lazy by default. Creating a Completable
, Single
, Maybe
, etc. only creates the method by which to process a stream, but does not begin execution. The stream is not created and the operators are not invoked until such time as some interested party subscribes. We can fix our case in one of two ways. The first is to utilize the input argument.
createCompletable("A")
.andThen { createCompletable("B").subscribe(it) }
.subscribe()
The second is to utilize ()
instead of {}
since a Completable
is a CompletableSource
.
createCompletable("A")
.andThen(createCompletable("B"))
.subscribe()
In both cases the Completable
returned by createCompletable("B")
is properly subscribed to and run. This results in the expected output:
Created Completable A
Created Completable B
Conclusions
RxJava is a massive, complex library. It is trying to do a lot of different things, and sometimes its helpful nature can hinder you in unexpected ways. Being able to separate creation from execution, and understanding the tools necessary to determine when RxJava is not being as lazy as you would like are key to a happy, successful experience. I hope that this post was able to explain away some of the ambiguity.