Understanding RxJava subscribeOn and observeOn

One of the biggest strengths of RxJava is its ability to easily schedule work and process results on various threads. This article aims to give you a solid foundation of working with threads in RxJava and RxAndroid to optimize system performance while avoiding bugs (threading-related bugs are notoriously hard to track down).
RxJava Schedulers
Threading in RxJava is done with help of Schedulers. Scheduler
can be thought of as a thread pool managing 1 or more threads. Whenever a Scheduler
needs to execute a task, it will take a thread from its pool and run the task in that thread.
Let’s summarize available Scheduler
types and their common uses:
Schedulers.io()
is backed by an unbounded thread pool. It is used for non CPU-intensive I/O type work including interaction with the file system, performing network calls, database interactions, etc. This thread pool is intended to be used for asynchronously performing blocking IO.Schedulers.computation()
is backed by a bounded thread pool with size up to the number of available processors. It is used for computational or CPU-intensive work such as resizing images, processing large data sets, etc. Be careful: when you allocate more computation threads than available cores, performance will degrade due to context switching and thread creation overhead as threads vie for processors’ time.Schedulers.newThread()
creates a new thread for each unit of work scheduled. This scheduler is expensive as new thread is spawned every time and no reuse happens.Schedulers.from(Executor executor)
creates and returns a custom scheduler backed by the specified executor. To limit the number of simultaneous threads in the thread pool, useScheduler.from(Executors.newFixedThreadPool(n))
. This guarantees that if a task is scheduled when all threads are occupied, it will be queued. The threads in the pool will exist until it is explicitly shutdown.- Main thread or
AndroidSchedulers.mainThread()
is provided by the RxAndroid extension library to RxJava. Main thread (also known as UI thread) is where user interaction happens. Care should be taken not to overload this thread to prevent janky non-responsive UI or, worse, Application Not Responding” (ANR) dialog. Schedulers.single()
is new in RxJava 2. This scheduler is backed by a single thread executing tasks sequentially in the order requested.Schedulers.trampoline()
executes tasks in a FIFO (First In, First Out) manner by one of the participating worker threads. It’s often used when implementing recursion to avoid growing the call stack.
WARNING: Be careful writing multi-threaded code using unbounded thread Schedulers such as
Schedulers.io()
andSchedulers.newThread()
. Depending on your data stream and the transformations you apply to it, it’s easier than you think to flood your system with threads.
Default threading in RxJava
If you don’t specify threading in RxJava (if you don’t specify subscribeOn
, observeOn
or both), the data will be emitted and processed by the current scheduler/thread (usually the main thread). For instance, all operators in the chain below will be processed by the current thread.
Note: some operators, such as interval
, operate on a computation
thread by default. See below for more details.
We can specify a thread to execute any operator by using subscribeOn
and/or observeOn
.
subscribeOn
affects upstream operators (operators above thesubscribeOn
)observeOn
affects downstream operators (operators below theobserveOn
)- If only
subscribeOn
is specified, all operators will be be executed on that thread - If only
observeOn
is specified, all operators will be executed on the current thread and only operators below theobserveOn
will be switched to thread specified by theobserveOn
For instance, in the following chain:
Data emission just
and the map
operator will be executed on the io
scheduler as directed by the upstream operator subscribeOn
.
filter
will be executed on the computation
scheduler as directed by the downstream operator observeOn
.
Read on for more details, ways to debug as well as nuances of the threading operator in RxJava.
Debugging threading
As before, let’s look at a basic RxJava chain where we emit Strings and calculate their lengths.
When executed, this will print:
item length 4
item length 6
item length 7
Now, let’s see what thread this work is being done on by printing out thread info in doOnNext()
, a side effect operator that gets executed for each item emitted.
When executed, this will print:
processing item on thread main
item length 4
processing item on thread main
item length 6
processing item on thread main
item length 7
So this stream is being emitted and processed on the main thread which makes sense because the block of code above resides inside the main
method of my class.
Doing work on background thread
Often it makes sense to delegate certain work to a background thread. A typical example would be offloading an IO operation from the main thread. RxJava makes it easy.
subscribeOn()
operator tells the source Observable
which thread to emit and push items on all the way down to Observer
(hence, it affects both upstream and downstream operators). It does not matter where you put the subscribeOn()
in your Observable
chain of operators.
Things to remember about our Observable
are:
- It is a “cold”
Observable
which means the emission occurs lazily only when Subscriber is added (call tosubscribe()
is made). New emission will happen for each subscriber added. subscribeOn()
specifies a Scheduler (thread pool) where the work will be performed after subscription is made insubscribe()
.- The results of transformation are received on the same thread as the thread that did the actual work. This can be changed using
observeOn()
as we’ll see soon.
Let’s run the updated code example inside the main method.
When executed, this will print nothing!
This is because the main method finished executing before the background thread returned results. To get around this, let’s keep the main method alive for an additional 3 seconds with Thread.sleep(3000)
— long enough to give our Observable
a chance to fire emissions on the background thread.
When executed, this will print:
processing item on thread RxNewThreadScheduler-1
item length 4 received on RxNewThreadScheduler-1
processing item on thread RxNewThreadScheduler-1
item length 6 received on RxNewThreadScheduler-1
processing item on thread RxNewThreadScheduler-1
item length 7 received on RxNewThreadScheduler-1
The results of the background thread work are returned on the same thread, RxNewThreadScheduler-1.
When performing Network/IO/computation tasks, using background scheduler is crucial. Without subscribeOn()
, your code will use a caller thread to perform operations, causing Observable
to become blocking.
Understanding observeOn()
As we saw above, subscribeOn()
instructs the source Observable
which thread to emit items on — this thread will push the emissions all the way to our Observer
. However, if it encounters an observeOn()
anywhere in the chain, it will switch and pass emissions using that Scheduler
for the remaining (downstream) operations.
Usually the observing thread in Android is the main (UI) thread,
AndroidSchedulers.mainThread()
. This requires RxAndroid extension library to RxJava.
Let’s modify our example code to perform background work on Schedulers.newThread()
but then switch to AndroidSchedulers.mainThread()
.
When executed, we will see that now results are received by the main thread.
processing item on thread RxNewThreadScheduler-1
processing item on thread RxNewThreadScheduler-1
processing item on thread RxNewThreadScheduler-1
item length 4 received on thread main
item length 6 received on thread main
item length 7 received on thread main
Doing work asynchronously
While RxJava is known as a library for composing asynchronous and event-based programs using observable sequences, there are a plenty of useful tasks it can do synchronously. For instance, map(String::length)
above handles each item using the same thread RxNewThreadScheduler-1 sequentially preserving the same order.
Simply using subscribeOn()
at the start of an Observable
chain means the process is still operating on a single thread and emitting items synchronously downstream. However, when you start combining different streams on different threads or use operators such as observeOn()
, interval()
, delay()
, your Observable chain is no longer synchronous.
Now, let’s see how the example above can be modified so that each item emitted is processed by a separate thread simultaneously.
Introducing flatMap()
flatMap()
wraps each item being emitted by an Observable
letting you apply its own RxJava operators including assigning a new Scheduler using subscribeOn()
to handle those operators. Once all items inside flatMap()
have been processed, the individual Observable
s are then merged back into a single Observable
in no particular order.
To make things more realistic, let us pretend that a transformation for each item takes up to 3 seconds to complete. The following 2 things should hold true:
- Each item is processed by its own thread
- Due to random time it takes to process each item, the order of the items completed is not guaranteed.
This will result in the following output:
processing item on thread RxNewThreadScheduler-3
processing item on thread RxNewThreadScheduler-1
processing item on thread RxNewThreadScheduler-2
received item length 7 on thread RxNewThreadScheduler-3
received item length 4 on thread RxNewThreadScheduler-1
received item length 6 on thread RxNewThreadScheduler-2
Notice that a) each item was processed by a separate thread and b) the order of the elements after the transformation is random. So flatMap()
worked exactly as we expected.
What if you need to preserve the order of the resulting items?
Introducing concatMap()
concatMap()
is similar to flatMap()
but guarantees that the order of the items processed is the same as in the original emission.
This results in the following output:
processing item on thread RxNewThreadScheduler-1
received item length 4 on thread RxNewThreadScheduler-1
processing item on thread RxNewThreadScheduler-2
received item length 6 on thread RxNewThreadScheduler-2
processing item on thread RxNewThreadScheduler-3
received item length 7 on thread RxNewThreadScheduler-3
Note that the items are returned in the same order as in the original stream.
subscribeOn() gotchas
As seen above, subscribeOn()
changes the thread on which our Observable
is emitted and transformed. In the absence of observeOn()
, the results of the stream processing are sent to the thread that did the work (thread specified in subscribeOn()
). For instance, if we have subscribeOn(Schedulers.computation())
and observeOn()
is not specified, the results are dispatched to the Computation thread as well.
It does not matter where you put subscribeOn()
operator within your chain — it will still denote the thread on which the Observable will be emitted on.
If you specify multiple
subscribeOn()
RxJava operators in your chain, only the first one will be used and the following ones will be ignored unless the subscribeOn() is used inside flatMap() as seen above.
Here is an example:
This will result in the following output:
processing item on thread RxComputationThreadPool-1
item length 4 received on RxComputationThreadPool-1
processing item on thread RxComputationThreadPool-1
item length 6 received on RxComputationThreadPool-1
processing item on thread RxComputationThreadPool-1
item length 7 received on RxComputationThreadPool-1
Note that Schedulers.computation()
thread pool above did the work while Schedulers.newThread()
was never used. This is because the computation Scheduler was listed first and all subsequent subscribeOn()
operators were simply ignored.
Default Schedulers
Some libraries specify subscribeOn()
internally to enforce which thread does the background work. For instance, Observable.delay()
from RxJava library will emit on the Computation Scheduler
by default. Any subscribeOn()
you specify on it will do nothing. However, you can use an overloaded version of the factory method for that operator instead to pass custom Scheduler
of your choice.
Pro-tip: RxLint can warn you when you use an operator such as
delay()
without overriding its default Scheduler.
What this also means is that when you use Scheduler-dependent operators such as delay()
, interval()
, etc. while using subscribeOn()
, you may be spawning (but not using) a thread without realizing it. Always review the Javadoc for those operators to ensure the optimal usage. In particular, pay attention to @SchedulerSupport
annotation.
onError()
Finally, when subscribeOn()
is used but the onError()
is not, if an error occurs, it will be thrown on the subscribed Scheduler thread but the error stacktrace will have no reference to the place where you subscribed. This will make debugging extremely hard. To avoid the issue, use onError()
.
observeOn() gotchas
It’s important to remember that unlike subscribeOn()
, placement of observeOn()
matters. Switching scheduler with observeOn()
applies to all downstream operators (operators listed below observeOn()
).
For instance, in the following example due to observeOn()
placement map(String::length)
and filter(length -> length == 6)
will be executed on the main thread. Is this really what was intended?
Be careful where you put the
observeOn()
operator because it changes the Scheduler performing the work! In most cases you probably want to delay switching to the observing thread until the very end of your Rx chain.
For instance, let’s look at the following RxJava chain which makes an HTTP network call:
There is no reason to have observeOn()
operator applied above the map()
operator. In fact, this code will result in NetworkOnMainThreadException! We do not want to be reading from HTTP response on the main thread — it should be done before we switch back to the main thread:
Be wary of multiple observeOn()
You can have multiple observeOn()
operators. As operators are executed downstream, each observeOn()
below will override the one above.
Here is an example:
The output is as follows:
first doOnNext: processing item on thread RxNewThreadScheduler-1
first doOnNext: processing item on thread RxNewThreadScheduler-1
first doOnNext: processing item on thread RxNewThreadScheduler-1
second doOnNext: processing item on thread RxComputationThreadPool-1
second doOnNext: processing item on thread RxComputationThreadPool-1
second doOnNext: processing item on thread RxComputationThreadPool-1
received item length 4 on thread RxCachedThreadScheduler-1
received item length 6 on thread RxCachedThreadScheduler-1
received item length 7 on thread RxCachedThreadScheduler-1
As a final note, I would recommend that you avoid this kind of complexity if at all possible. Doing so will make it significantly easier to debug and maintain this code in the future. If you are not convinced, check out Dan Lew’s podcast linked in the Resources section.
Recommended resources
- Reactive Programming on Android with RxJava by Chris Arriola
- Articles by Thomas Nield on RxJava and maximizing parallelization part1, part2 and part3
- Learning RxJava book by Thomas Nield
- Dan Lew’s Fragmented Podcast talk on the importance of keeping it simple with RxJava
- Exploring RxJava 2 for Android talk by Jake Wharton.
Thanks to Alex Hart for his input with this article.
Visit my Android blog to read about Jetpack Compose and other Android topics