Understanding RxJava subscribeOn and observeOn

James Shvarts
ProAndroidDev
Published in
10 min readSep 12, 2017

--

https://www.flickr.com/photos/marionchantal/24195403325

One of the biggest strengths of RxJava is its ability to easily schedule work and process results on various threads. This article aims to give you a solid foundation of working with threads in RxJava and RxAndroid to optimize system performance while avoiding bugs (threading-related bugs are notoriously hard to track down).

RxJava Schedulers

Threading in RxJava is done with help of Schedulers. Scheduler can be thought of as a thread pool managing 1 or more threads. Whenever a Scheduler needs to execute a task, it will take a thread from its pool and run the task in that thread.

Let’s summarize available Scheduler types and their common uses:

  1. Schedulers.io() is backed by an unbounded thread pool. It is used for non CPU-intensive I/O type work including interaction with the file system, performing network calls, database interactions, etc. This thread pool is intended to be used for asynchronously performing blocking IO.
  2. Schedulers.computation() is backed by a bounded thread pool with size up to the number of available processors. It is used for computational or CPU-intensive work such as resizing images, processing large data sets, etc. Be careful: when you allocate more computation threads than available cores, performance will degrade due to context switching and thread creation overhead as threads vie for processors’ time.
  3. Schedulers.newThread() creates a new thread for each unit of work scheduled. This scheduler is expensive as new thread is spawned every time and no reuse happens.
  4. Schedulers.from(Executor executor) creates and returns a custom scheduler backed by the specified executor. To limit the number of simultaneous threads in the thread pool, use Scheduler.from(Executors.newFixedThreadPool(n)). This guarantees that if a task is scheduled when all threads are occupied, it will be queued. The threads in the pool will exist until it is explicitly shutdown.
  5. Main thread or AndroidSchedulers.mainThread() is provided by the RxAndroid extension library to RxJava. Main thread (also known as UI thread) is where user interaction happens. Care should be taken not to overload this thread to prevent janky non-responsive UI or, worse, Application Not Responding” (ANR) dialog.
  6. Schedulers.single() is new in RxJava 2. This scheduler is backed by a single thread executing tasks sequentially in the order requested.
  7. Schedulers.trampoline() executes tasks in a FIFO (First In, First Out) manner by one of the participating worker threads. It’s often used when implementing recursion to avoid growing the call stack.

WARNING: Be careful writing multi-threaded code using unbounded thread Schedulers such as Schedulers.io() and Schedulers.newThread(). Depending on your data stream and the transformations you apply to it, it’s easier than you think to flood your system with threads.

Default threading in RxJava

If you don’t specify threading in RxJava (if you don’t specify subscribeOn, observeOn or both), the data will be emitted and processed by the current scheduler/thread (usually the main thread). For instance, all operators in the chain below will be processed by the current thread.

Note: some operators, such as interval, operate on a computation thread by default. See below for more details.

We can specify a thread to execute any operator by using subscribeOn and/or observeOn.

  • subscribeOn affects upstream operators (operators above the subscribeOn)
  • observeOn affects downstream operators (operators below the observeOn)
  • If only subscribeOn is specified, all operators will be be executed on that thread
  • If only observeOn is specified, all operators will be executed on the current thread and only operators below the observeOn will be switched to thread specified by the observeOn

For instance, in the following chain:

Data emission just and the map operator will be executed on the io scheduler as directed by the upstream operator subscribeOn.

filter will be executed on the computation scheduler as directed by the downstream operator observeOn.

Read on for more details, ways to debug as well as nuances of the threading operator in RxJava.

Debugging threading

As before, let’s look at a basic RxJava chain where we emit Strings and calculate their lengths.

When executed, this will print:

item length 4
item length 6
item length 7

Now, let’s see what thread this work is being done on by printing out thread info in doOnNext() , a side effect operator that gets executed for each item emitted.

When executed, this will print:

processing item on thread main
item length 4
processing item on thread main
item length 6
processing item on thread main
item length 7

So this stream is being emitted and processed on the main thread which makes sense because the block of code above resides inside the main method of my class.

Doing work on background thread

Often it makes sense to delegate certain work to a background thread. A typical example would be offloading an IO operation from the main thread. RxJava makes it easy.

subscribeOn() operator tells the source Observable which thread to emit and push items on all the way down to Observer (hence, it affects both upstream and downstream operators). It does not matter where you put the subscribeOn() in your Observable chain of operators.

Things to remember about our Observable are:

  1. It is a “cold” Observable which means the emission occurs lazily only when Subscriber is added (call to subscribe() is made). New emission will happen for each subscriber added.
  2. subscribeOn() specifies a Scheduler (thread pool) where the work will be performed after subscription is made in subscribe().
  3. The results of transformation are received on the same thread as the thread that did the actual work. This can be changed using observeOn() as we’ll see soon.

Let’s run the updated code example inside the main method.

Using subscribeOn()

When executed, this will print nothing!

This is because the main method finished executing before the background thread returned results. To get around this, let’s keep the main method alive for an additional 3 seconds with Thread.sleep(3000) — long enough to give our Observable a chance to fire emissions on the background thread.

When executed, this will print:

processing item on thread RxNewThreadScheduler-1
item length 4 received on RxNewThreadScheduler-1
processing item on thread RxNewThreadScheduler-1
item length 6 received on RxNewThreadScheduler-1
processing item on thread RxNewThreadScheduler-1
item length 7 received on RxNewThreadScheduler-1

The results of the background thread work are returned on the same thread, RxNewThreadScheduler-1.

When performing Network/IO/computation tasks, using background scheduler is crucial. Without subscribeOn(), your code will use a caller thread to perform operations, causing Observable to become blocking.

Understanding observeOn()

As we saw above, subscribeOn() instructs the source Observable which thread to emit items on — this thread will push the emissions all the way to our Observer. However, if it encounters an observeOn() anywhere in the chain, it will switch and pass emissions using that Scheduler for the remaining (downstream) operations.

Usually the observing thread in Android is the main (UI) thread, AndroidSchedulers.mainThread(). This requires RxAndroid extension library to RxJava.

Let’s modify our example code to perform background work on Schedulers.newThread() but then switch to AndroidSchedulers.mainThread().

Using observeOn()

When executed, we will see that now results are received by the main thread.

processing item on thread RxNewThreadScheduler-1
processing item on thread RxNewThreadScheduler-1
processing item on thread RxNewThreadScheduler-1
item length 4 received on thread main
item length 6 received on thread main
item length 7 received on thread main

Doing work asynchronously

While RxJava is known as a library for composing asynchronous and event-based programs using observable sequences, there are a plenty of useful tasks it can do synchronously. For instance, map(String::length) above handles each item using the same thread RxNewThreadScheduler-1 sequentially preserving the same order.

Simply using subscribeOn() at the start of an Observable chain means the process is still operating on a single thread and emitting items synchronously downstream. However, when you start combining different streams on different threads or use operators such as observeOn(), interval(), delay(), your Observable chain is no longer synchronous.

Now, let’s see how the example above can be modified so that each item emitted is processed by a separate thread simultaneously.

Introducing flatMap()

flatMap() wraps each item being emitted by an Observable letting you apply its own RxJava operators including assigning a new Scheduler using subscribeOn() to handle those operators. Once all items inside flatMap() have been processed, the individual Observables are then merged back into a single Observable in no particular order.

To make things more realistic, let us pretend that a transformation for each item takes up to 3 seconds to complete. The following 2 things should hold true:

  1. Each item is processed by its own thread
  2. Due to random time it takes to process each item, the order of the items completed is not guaranteed.
Using flatMap()

This will result in the following output:

processing item on thread RxNewThreadScheduler-3
processing item on thread RxNewThreadScheduler-1
processing item on thread RxNewThreadScheduler-2
received item length 7 on thread RxNewThreadScheduler-3
received item length 4 on thread RxNewThreadScheduler-1
received item length 6 on thread RxNewThreadScheduler-2

Notice that a) each item was processed by a separate thread and b) the order of the elements after the transformation is random. So flatMap() worked exactly as we expected.

What if you need to preserve the order of the resulting items?

Introducing concatMap()

concatMap() is similar to flatMap() but guarantees that the order of the items processed is the same as in the original emission.

Using concatMap()

This results in the following output:

processing item on thread RxNewThreadScheduler-1
received item length 4 on thread RxNewThreadScheduler-1
processing item on thread RxNewThreadScheduler-2
received item length 6 on thread RxNewThreadScheduler-2
processing item on thread RxNewThreadScheduler-3
received item length 7 on thread RxNewThreadScheduler-3

Note that the items are returned in the same order as in the original stream.

subscribeOn() gotchas

As seen above, subscribeOn() changes the thread on which our Observable is emitted and transformed. In the absence of observeOn(), the results of the stream processing are sent to the thread that did the work (thread specified in subscribeOn()). For instance, if we have subscribeOn(Schedulers.computation()) and observeOn() is not specified, the results are dispatched to the Computation thread as well.

It does not matter where you put subscribeOn() operator within your chain — it will still denote the thread on which the Observable will be emitted on.

If you specify multiple subscribeOn() RxJava operators in your chain, only the first one will be used and the following ones will be ignored unless the subscribeOn() is used inside flatMap() as seen above.

Here is an example:

Using multiple subscribeOn()

This will result in the following output:

processing item on thread RxComputationThreadPool-1
item length 4 received on RxComputationThreadPool-1
processing item on thread RxComputationThreadPool-1
item length 6 received on RxComputationThreadPool-1
processing item on thread RxComputationThreadPool-1
item length 7 received on RxComputationThreadPool-1

Note that Schedulers.computation() thread pool above did the work while Schedulers.newThread() was never used. This is because the computation Scheduler was listed first and all subsequent subscribeOn() operators were simply ignored.

Default Schedulers

Some libraries specify subscribeOn() internally to enforce which thread does the background work. For instance, Observable.delay() from RxJava library will emit on the Computation Scheduler by default. Any subscribeOn() you specify on it will do nothing. However, you can use an overloaded version of the factory method for that operator instead to pass custom Scheduler of your choice.

delay() API and use of Schedulers

Pro-tip: RxLint can warn you when you use an operator such as delay() without overriding its default Scheduler.

What this also means is that when you use Scheduler-dependent operators such as delay(), interval(), etc. while using subscribeOn(), you may be spawning (but not using) a thread without realizing it. Always review the Javadoc for those operators to ensure the optimal usage. In particular, pay attention to @SchedulerSupport annotation.

onError()

Finally, when subscribeOn() is used but the onError() is not, if an error occurs, it will be thrown on the subscribed Scheduler thread but the error stacktrace will have no reference to the place where you subscribed. This will make debugging extremely hard. To avoid the issue, use onError().

observeOn() gotchas

It’s important to remember that unlike subscribeOn(), placement of observeOn() matters. Switching scheduler with observeOn() applies to all downstream operators (operators listed below observeOn()).

For instance, in the following example due to observeOn() placement map(String::length) and filter(length -> length == 6) will be executed on the main thread. Is this really what was intended?

Be careful where you put the observeOn() operator because it changes the Scheduler performing the work! In most cases you probably want to delay switching to the observing thread until the very end of your Rx chain.

For instance, let’s look at the following RxJava chain which makes an HTTP network call:

observeOn() before map()

There is no reason to have observeOn() operator applied above the map() operator. In fact, this code will result in NetworkOnMainThreadException! We do not want to be reading from HTTP response on the main thread — it should be done before we switch back to the main thread:

Be wary of multiple observeOn()

You can have multiple observeOn() operators. As operators are executed downstream, each observeOn() below will override the one above.

Here is an example:

Using multiple observeOn()

The output is as follows:

first doOnNext: processing item on thread RxNewThreadScheduler-1
first doOnNext: processing item on thread RxNewThreadScheduler-1
first doOnNext: processing item on thread RxNewThreadScheduler-1
second doOnNext: processing item on thread RxComputationThreadPool-1
second doOnNext: processing item on thread RxComputationThreadPool-1
second doOnNext: processing item on thread RxComputationThreadPool-1
received item length 4 on thread RxCachedThreadScheduler-1
received item length 6 on thread RxCachedThreadScheduler-1
received item length 7 on thread RxCachedThreadScheduler-1

As a final note, I would recommend that you avoid this kind of complexity if at all possible. Doing so will make it significantly easier to debug and maintain this code in the future. If you are not convinced, check out Dan Lew’s podcast linked in the Resources section.

Recommended resources

  1. Reactive Programming on Android with RxJava by Chris Arriola
  2. Articles by Thomas Nield on RxJava and maximizing parallelization part1, part2 and part3
  3. Learning RxJava book by Thomas Nield
  4. Dan Lew’s Fragmented Podcast talk on the importance of keeping it simple with RxJava
  5. Exploring RxJava 2 for Android talk by Jake Wharton.

Thanks to Alex Hart for his input with this article.

Visit my Android blog to read about Jetpack Compose and other Android topics

--

--