ProAndroidDev

The latest posts from Android Professionals and Google Developer Experts.

Follow publication

Eager vs. Lazy in RxJava

--

An Eager Beaver. https://upload.wikimedia.org/wikipedia/commons/6/6b/American_Beaver.jpg

Over the last several years, RxJava has been a staple of Android software development. RxJava has a very high learning curve, and part of that is because you are really learning two different things at once. The first is the application of operators to a stream of values. This includes doing the normal routine tasks in RxJava that most people are used to.

Observable.just(1, 2, 3, 4, 5)
.map { it + 5 }
.subscribe(::println)
// Prints 6, 7, 8, 9, 10

The other half of the RxJava equation is that of the asynchronous threading model. Herein lies the need to understand the difference between eager and lazy evaluation, and to know a bit about how RxJava operates under the hood. In this post, I would like to build up an example of eager vs. lazy evaluation, and show why we might want to double check how RxJava works in different circumstances.

Definitions

First, it is best to define some terms. Eager evaluation means that when you assign an expression to a variable, the expression is evaluated immediately and the variable’s value is set. This leads to very straightforward, deterministic behavior in the software we write. This is the default in most programming languages such as Java, Kotlin, or C. On the other hand, lazy evaluation only evaluates at the last possible moment before the value is needed. If you never access the value, the code determining the value is never run. This is the default in languages like Haskell, and many other functional programming languages. With respect to RxJava, it is sufficient to say that eager operators try to do things as soon as possible, while lazy operators tend to wait until the very last moment.

Minimizing Requests

Let’s imagine a situation where we have multiple requests to a network service. While we are processing that request, we want to ignore any new requests. Such cases could be seen in the downloading of a configuration feed, uploading of some sort of data, etc.

Our first intuition might be to utilize a debounce . However, a debounce will wait the full length of the given timeout before emitting any data. After this timeout, it will emit the most recent item it received. This is the opposite of what we want, as we would prefer our network request to start as soon as possible.

One approach to solving this problem is to utilize a Flowable and an appropriate backpressure strategy. We can “fake” 5 button presses spaced 10ms apart utilizing an interval followed up with a take. We also add a strategy for our backpressure, via onBackpressureDrop.

Backpressure occurs when the downstream (stuff below the call to onBackpressureDrop) can not process the values being emitted from the upstream (stuff above the call to onBackpressureDrop) fast enough. In our case, we specified that they should be dropped. We could also Buffer, throw an error, or something else, as defined by the BackpressureStrategy enum.

Flowable.interval(10, TimeUnit.MILLISECONDS)
.take(5)
.onBackpressureDrop()

Now, let’s imagine we have some long-running task we want to perform on each emission. Because we are performing network IO, we’ll be sure to do this work on the IO threadpool.

fun webRequest(i: Long) = Single.create<Long> {
println("Preparing $i")
Thread.sleep(100)
it.onSuccess(i)
}.subscribeOn(Schedulers.io())

We can bring this into our stream like so:

Flowable.interval(10, TimeUnit.MILLISECONDS)
.take(5)
.onBackpressureDrop()
.flatMapSingle(::webRequest)
.subscribe { println("Got $it") }

Great! We can subscribe and away we go…

Preparing 2
Preparing 3
Preparing 0
Preparing 1
Preparing 4
Got 4
Got 2
Got 3
Got 0
Got 1

Whoops! Something’s wrong. Everything is out of order, and we processed everything, instead of dropping things one at a time. Well for one, we never actually told our stream that we only want to process a single value at a time. We can try to utilize a ResourceSubscriber to do this, invoking its request method. We’ll change our subscribe call to look like this:

.subscribe(object : ResourceSubscriber<Long>() {

override fun onStart() {
println("Start")
request(1)
}

override fun onComplete() {
println("Completed")
}

override fun onNext(t: Long?) {
println("Got $t")
request(1)
}

override fun onError(t: Throwable?) {
error("This should never happen")
}
})

Ok, so now we run again…

Start
Preparing 0
Preparing 1
Preparing 2
Preparing 3
Preparing 4
Got 1
Got 0
Got 2
Got 4
Got 3
Completed

Oh no. Our output looks the exact same. So what is happening? Notice how all of the Preparing calls are at the top of our output. This means that we’re calling each Preparing statement before any of our output is actually processed by our onNext override in our ResourceSubscriber. That’s a problem, because that would imply that every time the user hits our fabricated button, we are making a network request.

There is a handy method called doOnRequest that will help show us why we’re getting more data than expected. By inserting calls to doOnRequest to either side of our flatMapSingle, we can get some insight as to what is going on.

.doOnRequest { println("A: $it") }
.flatMapSingle(::webRequest)
.doOnRequest { println("B: $it") }

Our output looks like so:

Start
B: 1
A: 9223372036854775807
Preparing 2
Preparing 0
Preparing 1
Preparing 3
Preparing 4
Got 2
B: 1
Got 0
B: 1
Got 1
B: 1
Got 3
B: 1
Got 4
B: 1
Completed

Ok, that is a large number. It turns out that flatMapSingle is very eager to get some work done. It is important to remember that every operator in RxJava creates an internal subscription. In the case of Flowables, this subscription includes making a call to request to get some data from the upstream. By default, flatMapSingle just asks for as much data as possible. To restrict this, we can utilize an override to flatMapSingle .

.flatMapSingle(::webRequest, false, 1)

Here we are telling flatMapSingle that we only want 1 object at a time (our maxConcurrency value). We end up with the following output:

Start
B: 1
A: 1
Preparing 0
Got 0
B: 1
Completed

Which looks a lot better, and is what we really wanted in the first place. In fact, we can simplify our example. Originally, we were trying to restrict our data load with a ResourceSubscriber. This is not necessary. Because we have told our flatMapSingle to only accept one item at a time, we can eliminate this subscriber and utilize a simpler subscribe call.

Flowable.interval(10, TimeUnit.MILLISECONDS)
.take(5)
.onBackpressureDrop()
.flatMapSingle(::webRequest, false, 1)
.subscribe { println("Got $it") }

There are similar overrides for other flatMap methods which you can utilize to get similar behaviors. The takeaway here is that you should look for opportunities to restrict the flow of data and decide on a backpressure strategy as early as possible. It will lead to less processing in the long run.

Kotlin: {} vs. ()

Another point of contention that comes up a lot with RxJava is the hours wasted to the difference between {} and () when invoking operators. This next example hopes to shed some light on the difference.

Consider the function createCompletable, which creates a Completable that simply prints off a given label, goes to sleep for 100ms, and then completes.

fun createCompletable(label: String) =
Completable.create {
println("Created Completable $label")
Thread.sleep(100)
it.onComplete()
}

We can construct a chain of Completable objects utilizing andThen. One possible valid implementation looks like so:

createCompletable("A")
.andThen { createCompletable("B") }
.subscribe()

However, when we run this, all we see is:

Created Completable A

What happened to B? Well, we used {} instead of (). In Kotlin, there is syntactic sugar that lets you move lambdas outside of a method’s () provided they are the last argument. Where this confusion comes in is that andThen utilizes a SAM interface, CompletableSource. This interface has a single method, with the signature (CompletableObserver) -> Unit. There is nothing returned from this method, so calling arbitrary statements within it have no effect.

In our example, we are calling createCompletable("B"). However, this is never executed, because it is never subscribed to. RxJava is lazy by default. Creating a Completable, Single, Maybe, etc. only creates the method by which to process a stream, but does not begin execution. The stream is not created and the operators are not invoked until such time as some interested party subscribes. We can fix our case in one of two ways. The first is to utilize the input argument.

createCompletable("A")
.andThen { createCompletable("B").subscribe(it) }
.subscribe()

The second is to utilize () instead of {} since a Completable is a CompletableSource .

createCompletable("A")
.andThen(createCompletable("B"))
.subscribe()

In both cases the Completable returned by createCompletable("B") is properly subscribed to and run. This results in the expected output:

Created Completable A
Created Completable B

Conclusions

RxJava is a massive, complex library. It is trying to do a lot of different things, and sometimes its helpful nature can hinder you in unexpected ways. Being able to separate creation from execution, and understanding the tools necessary to determine when RxJava is not being as lazy as you would like are key to a happy, successful experience. I hope that this post was able to explain away some of the ambiguity.

Free

Distraction-free reading. No ads.

Organize your knowledge with lists and highlights.

Tell your story. Find your audience.

Membership

Read member-only stories

Support writers you read most

Earn money for your writing

Listen to audio narrations

Read offline with the Medium app

--

--

Published in ProAndroidDev

The latest posts from Android Professionals and Google Developer Experts.

Written by Alex Hart

Avid gamer, motorcycle enthusiast, and Senior Mobile Developer at REDspace. Passionate about and applying new, innovative concepts and patterns in my work.

No responses yet

Write a response