Understanding RxJava subscribeOn and observeOn

One of the biggest strengths of RxJava is its ability to easily schedule work and process results on various threads. This article aims to give you a solid foundation of working with threads in RxJava and RxAndroid to optimize system performance while avoiding bugs (threading-related bugs are notoriously hard to track down).

RxJava Schedulers

Threading in RxJava is done with help of Schedulers. Scheduler can be thought of as a thread pool managing 1 or more threads. Whenever a Scheduler needs to execute a task, it will take a thread from its pool and run the task in that thread.

Let’s summarize available Scheduler types and their common uses:

  1. Schedulers.io() is backed by an unbounded thread pool. It is used for non CPU-intensive I/O type work including interaction with the file system, performing network calls, database interactions, etc. This thread pool is intended to be used for asynchronously performing blocking IO.
  2. Schedulers.computation() is backed by a bounded thread pool with size up to the number of available processors. It is used for computational or CPU-intensive work such as resizing images, processing large data sets, etc. Be careful: when you allocate more computation threads than available cores, performance will degrade due to context switching and thread creation overhead as threads vie for processors’ time.
  3. Schedulers.newThread() creates a new thread for each unit of work scheduled. This scheduler is expensive as new thread is spawned every time and no reuse happens.
  4. Schedulers.from(Executor executor) creates and returns a custom scheduler backed by the specified executor. To limit the number of simultaneous threads in the thread pool, use Scheduler.from(Executors.newFixedThreadPool(n)). This guarantees that if a task is scheduled when all threads are occupied, it will be queued. The threads in the pool will exist until it is explicitly shutdown.
  5. Main thread or AndroidSchedulers.mainThread() is provided by the RxAndroid extension library to RxJava. Main thread (also known as UI thread) is where user interaction happens. Care should be taken not to overload this thread to prevent janky non-responsive UI or, worse, Application Not Responding” (ANR) dialog.
  6. Schedulers.single() is new in RxJava 2. This scheduler is backed by a single thread executing tasks sequentially in the order requested.
  7. Schedulers.trampoline() executes tasks in a FIFO (First In, First Out) manner by one of the participating worker threads. It’s often used when implementing recursion to avoid growing the call stack.
WARNING: Be careful writing multi-threaded code using unbounded thread Schedulers such as Schedulers.io() and Schedulers.newThread(). Depending on your data stream and the transformations you apply to it, it’s easier than you think to flood your system with threads.

Show me the code

Let’s start with a basic RxJava code where an Observable<String> is emitted and length of each item is calculated. We will build upon this example in the following sections.

When executed, this will print:

item length 4
item length 6
item length 7

Now, let’s see what thread this work is being done on by printing out thread info in doOnNext() , a side effect operator that gets executed for each item emitted.

When executed, this will print:

processing item on thread main
item length 4
processing item on thread main
item length 6
processing item on thread main
item length 7

So this stream is being emitted and processed on the main thread which makes sense because the block of code above resides inside the main method of my class.

Doing work on background thread

Often it makes sense to delegate certain work to a background thread (aka run task asynchronously). A typical example would be offloading an IO operation from the main thread. RxJava makes it easy.

subscribeOn() operator tells the source Observable which thread to emit and transform items on. It does not matter where you put the subscribeOn() in your Observable chain of operators.

Things to remember about our Observable are:

  1. It is a “cold” Observable which means the emission occurs lazily only when Subscriber is added (call to subscribe() is made). New emission will happen for each subscriber added.
  2. subscribeOn() specifies a Scheduler (thread pool) where the work will be performed after subscription is made in subscribe().
  3. The results of transformation are received on the same thread as the thread that did the actual work. This can be changed using observeOn() as we’ll see soon.

Let’s run the updated code example inside the main method.

Using subscribeOn()

When executed, this will print nothing!

This is because the main method finished executing before the background thread returned results. To get around this, let’s keep the main method alive for an additional 3 seconds with Thread.sleep(3000) — long enough to get the response from the background thread. The same trick can be used for other examples below.

When executed, this will print:

processing item on thread RxNewThreadScheduler-1
item length 4 received on RxNewThreadScheduler-1
processing item on thread RxNewThreadScheduler-1
item length 6 received on RxNewThreadScheduler-1
processing item on thread RxNewThreadScheduler-1
item length 7 received on RxNewThreadScheduler-1

Note that since no observeOn() was specified, the results of the background thread work are returned (observed) on the same thread, RxNewThreadScheduler-1.

When performing Network/IO/computation tasks, using background scheduler is crucial. Without subscribeOn(), your code will use a caller thread to perform operations, causing Observable to become blocking.

Observing results with observeOn()

As we saw above, subscribeOn() instructs the source Observable which thread to emit items on, and this thread will push the items all the way to the Subscriber. However, if it encounters an observeOn() somewhere in the chain, it will then pass emissions to that thread for the remaining (downstream) operations.

Usually the observing thread in Android is the main (UI) thread, AndroidSchedulers.mainThread(). This requires RxAndroid extension library to RxJava.

Let’s modify our example code to perform background work on Schedulers.newThread() and observe results on AndroidSchedulers.mainThread().

Using observeOn()

When executed, we will see that now results are received by the main thread.

processing item on thread RxNewThreadScheduler-1
processing item on thread RxNewThreadScheduler-1
processing item on thread RxNewThreadScheduler-1
item length 4 received on thread main
item length 6 received on thread main
item length 7 received on thread main

Doing work asynchronously

While RxJava is known as a library for composing asynchronous and event-based programs using observable sequences, there are a plenty of useful tasks it can do synchronously. For instance, map(String::length) above handles each item using the same thread RxNewThreadScheduler-1 sequentially preserving the same order.


Now, let’s see how the example above can be modified so that each item emitted is processed by a separate thread simultaneously.

Introducing flatMap()

flatMap() wraps each item being emitted by an Observable letting you apply its own RxJava operators including assigning a new Scheduler using subscribeOn() to handle those operators. Once all items inside flatMap() have been processed, the individual Observables are then merged back into a single Observable in no particular order.

To make things more realistic, let us pretend that a transformation for each item takes up to 3 seconds to complete. The following 2 things should hold true:

  1. Each item is processed by its own thread
  2. Due to random time it takes to process each item, the order of the items completed is not guaranteed.
Using flatMap()

This will result in the following output:

processing item on thread RxNewThreadScheduler-3
processing item on thread RxNewThreadScheduler-1
processing item on thread RxNewThreadScheduler-2
received item length 7 on thread RxNewThreadScheduler-3
received item length 4 on thread RxNewThreadScheduler-1
received item length 6 on thread RxNewThreadScheduler-2

Notice that a) each item was processed by a separate thread and b) the order of the elements after the transformation is random. So flatMap() worked exactly as we expected.

What if you need to preserve the order of the resulting items?

Introducing concatMap()

concatMap() is similar to flatMap() but guarantees that the order of the items processed is the same as in the original emission.

Using concatMap()

This results in the following output:

processing item on thread RxNewThreadScheduler-1
received item length 4 on thread RxNewThreadScheduler-1
processing item on thread RxNewThreadScheduler-2
received item length 6 on thread RxNewThreadScheduler-2
processing item on thread RxNewThreadScheduler-3
received item length 7 on thread RxNewThreadScheduler-3

Note that the items are returned in the same order as in the original stream.

subscribeOn() gotchas

As seen above, subscribeOn() changes the thread on which our Observable is emitted and transformed. In the absence of observeOn(), the results of the stream processing are sent to the thread that did the work (thread specified in subscribeOn()). For instance, if we have subscribeOn(Schedulers.computation()) and observeOn() is not specified, the results are dispatched to the Computation thread as well.

It does not matter where you put subscribeOn() operator within your chain — it will still denote the thread on which the Observable will be emitted on.

If you specify multiple subscribeOn() RxJava operators in your chain, only the first one will be used and the following ones will be ignored.

Here is an example:

Using multiple subscribeOn()

This will result in the following output:

processing item on thread RxComputationThreadPool-1
item length 4 received on RxComputationThreadPool-1
processing item on thread RxComputationThreadPool-1
item length 6 received on RxComputationThreadPool-1
processing item on thread RxComputationThreadPool-1
item length 7 received on RxComputationThreadPool-1

Note that Schedulers.computation() thread pool above did the work while Schedulers.newThread() was never used. This is because the computation Scheduler was listed first and all subsequent subscribeOn() operators were simply ignored.

Default Schedulers

Some libraries specify subscribeOn() internally to enforce which thread does the background work. For instance, Observable.delay() from RxJava library will emit on the Computation Scheduler by default. Any subscribeOn() you specify on it will do nothing. However, you can use an overloaded version of the factory method for that operator instead to pass custom Scheduler of your choice.

delay() API and use of Schedulers
Pro-tip: RxLint can warn you when you use an operator such as delay() without overriding its default Scheduler.

What this also means is that when you use an operator such as delay(), interval(), etc. you may be spawning a new thread without realizing it. Always review the Javadoc to figure out the optimal usage. In particular, pay attention to @SchedulerSupport annotation.

onError()

Finally, when subscribeOn() is used but the onError() is not, if an error occurs, it will be thrown on the subscribed Scheduler thread but the error stacktrace will have no reference to the place where you subscribed. This will make debugging extremely hard. To avoid the issue, use onError().

observeOn() gotchas

It’s important to remember that observeOn() applies to all downstream operators (operators listed below observeOn()).

For instance, in the following example observeOn() will apply to map(String::length) and filter(length -> length == 6)

Be careful where you put the observeOn() operator because it changes the thread performing the work! In most cases you probably want to delay switching to the observing thread until the very end of your Rx chain.

For instance, let’s look at the following RxJava chain which makes an HTTP network call:

observeOn() before map()

There is no reason to have observeOn() operator applied above the map() operator. In fact, this code will result in NetworkOnMainThreadException! We do not want to be reading from HTTP response on the main thread — it should be done before we switch back to the main thread:

Be wary of multiple observeOn()

You can have multiple observeOn() operators. As operators are executed downstream, each observeOn() below will override the one above.

Here is an example:

Using multiple observeOn()

The output is as follows:

first doOnNext: processing item on thread RxNewThreadScheduler-1
first doOnNext: processing item on thread RxNewThreadScheduler-1
first doOnNext: processing item on thread RxNewThreadScheduler-1
second doOnNext: processing item on thread RxComputationThreadPool-1
second doOnNext: processing item on thread RxComputationThreadPool-1
second doOnNext: processing item on thread RxComputationThreadPool-1
received item length 4 on thread RxCachedThreadScheduler-1
received item length 6 on thread RxCachedThreadScheduler-1
received item length 7 on thread RxCachedThreadScheduler-1

As a final note, I would recommend that you avoid this kind of complexity if at all possible. Doing so will make it significantly easier to debug and maintain this code in the future. If you are not convinced, check out Dan Lew’s podcast linked in the Resources section.

Recommended resources

  1. Reactive Programming on Android with RxJava by Chris Arriola
  2. Articles by Thomas Nield on RxJava and maximizing parallelization part1, part2 and part3
  3. RxJava: subscribeOn vs observeOn by Tomek Polański
  4. Dan Lew’s Fragmented Podcast talk on the importance of keeping it simple with RxJava
  5. Exploring RxJava 2 for Android talk by Jake Wharton.

Thanks to Alex Hart for his input with this article.