State propagation in Android with RxJava Subjects
When working in Android projects, it is quite common to have different pieces of your application that need to react or get updated when something occurs somewhere within your UI or application. This problem, that in principle may look quite simple, quickly gets out of hands.
A common solution: Event Buses
A very common approach is to use an Event Bus (Otto or EventBus are good options in Android). In this approach, the component that notices the action will post an event that other components needs to react to. Code looks something like this:
// At the source
bus.post(new AvailableEvent(42));
Somewhere else…
public somewhere() {
// blabla
bus.register(this);
}@Subscribe
public void onAvailableEvent(AvailableEvent event) {
// React to the event somehow!
}
This approach has worked fine for me in the past but I was never fully happy with it. In my experience this approach can get quickly out of hands and is error prone, producing the so called event-madness: Events send multiple times from different sources, events that nobody listen for, are never posted, or get lost due to UI not being subscribed at the moment of posting.
And for me the more important handicap, the traceability of the code. At the moment of writing the code everything is clear and looks like a clean solution, but come back a week later and you will be scratching your head about who is creating the event and under which conditions. This is especially problematic in large code bases with several people contributing or when multiple events are linked together.
Broadcast Receivers
An alternative to event buses are Broadcast Receivers. They follow a similar idea and have the same problems as buses, but are way more cumbersome and require your payloads to work on Intents (extras keys, Parcelable interfaces, etc.) -.- Lets move on…
In the search of something better: Using Subjects to propagate state
In this post I want to explore how we can use RxJava Subjects to achieve the functionality required, overcoming some of the issues appearing in other solutions. But before building the components that help us share state across the app, lets see what Subjects are and how to work with them.
Note: I will not be covering the basics of RxJava in this blog, so a familiarity with the core concepts is recommended.
A (very) quick intro to Subjects in RxJava
I do not pretend to give a full explanation about RxJava Subjects, you can find proper definition in the official docs and this great blog post from David Karnok.
But in a nutshell, Subjects is a sort of bridge or proxy that acts both as an Observer and as an Observable. That means that it is an object that clients can chain onto it (Observable), and can also emit values and terminal events as well (Observer). In particular we are gonna pay special attention to two of the available subjects: BehaviorSubject and PublishSubject.
Thread Safety
When working with Subjects, you need to be aware of who is pushing content and from where. In our case, we may expect updates to come from different threads. And here is the problem, all Subjects are NOT thread safe, except SerializedSubject. In order to solve our problem and convert a subject in a thread safe version, we can call Subject#toSerialized()
For an in-deep explanation about thread safety issues in Subjects check this post by Artem Zinnatullin.
Going a step further: RxRelay
Subjects are stateful in a dangerous way: when they receive an onComplete or onError they no longer become usable for sharing data. Jake Wharton created a library to solved this problem RxRelay. From the repo README: A Relay is a Subject without the ability to call onComplete or onError. This makes them perfectly suitable for our task, sharing data. It contains equivalent construct for each of the Subjects: BehaviourRelay, PublishRelay and ReplayRelay. There is no version of AsyncSubject since there is no termination events on Relays.
A Relay is a Subject except without the ability to call onComplete or onError
A first approach: RxBus
Our first solution would be based on PublishSubject. We can implement a bus relatively easily with one of them.. There are several blog posts out there explaining the approach (post1, post2). Here is a simple implementation using Relays:
Implementation Note 1: upon registration we pass the event class to be notified, in this way we are going to notify only events for the expected class.
This basically is an implementation of an Event Bus in RxJava and, in my opinion, it does not provide much advantages above a common bus. Surely it improves the code traceability and brings the RxJava goodies (composing operations, thread control, etc.), but still, it looks like we could improve it.
A better option: the Store component
We can consider a Store like an event bus for a single resource. Store name is inspired by Flux architecture and the Store concept in there. But unlike the Store in Flux, here it is used only to keep state, no logic.
We can consider a Store like an event bus for a single resource.
This idea brings a few benefits over a generic bus.
- Single source of truth. Since we are using a single resource, we can use a BehaviourRelay under the hood instead of a PublishSubject, making the last state always available as soon as someone subscribes. This allows us to know the resource state at any given moment.
- Scalability and flexibility. A better separation of concerns is created since you are gonna create several Stores for several resources, having the possibility to include specific requirements in each of them. Eg: we could have Stores with different scopes — Singleton vs Screen.
- Initial state. A BehaviorSubject supports a default initial value, which can be really useful in different scenarios.
- Increased code traceability. Since we are having different Stores, it is clear who and under which conditions the Store is updated.
The implementation looks like this
Implementation Note 1: Exposing the Subject. We mask the Subject by calling .asObservable() method, which will return the Subject as a pure Observable without exposing its Subscriber interface.
Implementation Note 2: We use .distinctUntilChanged() so we avoid notifying twice the same state.
A practical example: Shopping Cart
I have built an small demo application to illustrate the concept. The app is a simulation of a Grocery-Store. Within the app we are gonna use our Store concept for sharing the Cart resource across the application. Full source code can be found in the following Github repo.
The application contains two main screens: a Catalog and a Cart screen.

The idea is that every time we add/remove a product from the Cart, the Cart-Screen and the Cart-Tab-Counter would be updated.
For this we are gonna create our CartStore object, which extends the previously introduced Store class. We do this to ease injection and code readability.
public class CartStore extends Store<Cart> { }
Then, in our UI, our components will subscribe to the CartStore in order to get updates. Here is how the Cart-Tab-Counter implementation may looked like:
cartStore.observe()
.subscribeOn(Schedulers.computation())
.map(cart -> {
int cartProductsCount = 0;
for (CartProduct product : cart.products) {
cartProductsCount += product.quantity;
}
return cartProductsCount;
})
.observeOn(AndroidSchedulers.mainThread())
.subscribe(count -> updateTabTitle(count))
As we can see, this subscriber listens for Cart updates, collects the product count within the Cart and then updates the title.
Finally, we need to make sure that our CartStore gets updated every time that we modify the Cart content. Lets take a look how the CartService may look for the addToCart operation:
public class CartService {
private final CartDataSource cartDataSource;
private final CartStore cartStore;
// Constructor & others...
public Observable<Cart> addProduct(CartProduct cartProduct) {
return cartDataSource.addProduct(cartProduct)
.doOnNext(cart -> cartStore.publish(cart));
}
}public interface CartDataSource {
Observable<Cart> addProduct(CartProduct cartProduct);
}
As we can see, the CartService talks to the CartDataSource and once the data is updated, it publishes the new content on the CartStore, making it available to other subscribers. This creates a single point of change, which is easier to test and reason about.
The overall process can be visualized in the following schema:

Ok… so should I use the Store for everything?
Well my recommendation would be to use the Store for the most important resources in your application, like a shopping-cart, favorite-list, user status or similar, but still keep the Bus in case you need to propagate lesser important events.
Use Store for your more important resources, but still keep the Bus for smaller events if needed.
Conclusion
In this post we cover the problem that may arise using standard Event Buses for sharing state and how RxJava Subjects can provide a better solution to this problem. We also cover two different solutions to the same problem and the use cases for each of them.
I hope you enjoy this approach. Feel free to contribute and share your thoughts in the comments.
Happy coding :)