Exploring RxJava in Android — Utility Operators

Anitaa Murthy
ProAndroidDev
Published in
4 min readAug 19, 2018

--

This article is part of RxJava Introduction series. You can checkout the entire series here:

Utility Operators

Delay

This operator shifts the emissions from an Observable forward in time by a particular amount. i.e. modifies its source Observable by pausing for a particular increment of time before emitting each of the items from the Observable.

Sample Implementation: The below code demonstrates the use of delay() operator. The below code emits all the Observable items after a delay of 2 seconds.

Output

onNext: A
onNext: B
onNext: C
onNext: D
onNext: E
onNext: F

Do

This operator registers an action to take upon a variety of Observable lifecycle events.

  • The doOnNext() operator modifies the Observable source so that it invokes an action when the onNext() is called.
  • The doOnCompleted() operator registers an action so that it invokes an action when the onComplete() is called.
  • The doOnEach() operator modifies the Observable source so that it notifies an Observer for each item and establishes a callback that will be called each time an item is emitted.
  • The doOnSubscribe() operator registers an action which is called whenever an Observer subscribes to the resulting Observable.

Sample Implementation: The below code demonstrates the use of doOnNext() , doOnEach() , doOnSubscribe() , doOnUnsubscribe() operators.

Output

doOnNext: A
doOnNext: B
doOnNext: C
doOnNext: D
doOnNext: E
doOnNext: F
---------------
onUnSubscribe is called
doOnEach: 1
doOnEach: 2
doOnEach: 3
doOnEach: 4
doOnEach: 5
Complete is called

Materialize/Dematerialize

This operator represents both the items emitted and the notification sent as emitted items, or vice versa. What .materialize() does is basically wrap the observed object types into an observable Notification object on which we can check whether the onNext(), onError() and/or onComplete() methods are called. dematerialize(), as you might guess, reverses the effect.

Sample Implementation: The below code demonstrates the use of materialize() operator. From the materialize() operator, we can get the notification object. Using this object, we can check if the emitted item is: isOnNext() or isOnError() or osOnComplete(). Here we can basically fetch items that are successful and omit items that resulted in error.

Output

A B C D E F

ObserveOn

This operator specifies the scheduler on which an observer will observe this Observable. By default, an Observable along with the operator chain will operate on the same thread on which its Subscribe method is called. The observeOn() operator specifies a different Scheduler that the Observable will use for sending notifications to Observers.

Sample Implementation: The below code demonstrates the use of observeOn() operator.

Output

2 * length of string: 2
2 * length of string: 4
2 * length of string: 6
2 * length of string: 8
2 * length of string: 10
2 * length of string: 12

SubscribeOn

This operator tells the source Observable which thread to use for emitting items to the Observer.

Sample Implementation: The below code demonstrates the use of subscribeOn() operator.

Output

Emitting 100 on thread RxCachedThreadScheduler-1
Received 1000 on thread RxCachedThreadScheduler-1
Emitting 200 on thread RxCachedThreadScheduler-1
Received 2000 on thread RxCachedThreadScheduler-1

Note: When multiple subscribeOns are used in succession, only the first one takes effect.

TimeInterval

This operator converts an Observable that emits items into one that emits indications of the amount of time elapsed between those emissions. i.e. if we are more interested in how much time has passed since the last item, rather than the absolute moment in time when the items were emitted, we can use the timeInterval() method.

Sample Implementation: The below code demonstrates the use of timeInterval() operator.

Output

onNext: Timed[time=124, unit=MILLISECONDS, value=0]
onNext: Timed[time=100, unit=MILLISECONDS, value=1]
onNext: Timed[time=98, unit=MILLISECONDS, value=2]

Timeout

This operator mirrors the source Observable, but issues an error notification if a particular period of time elapses without any emitted items.

Sample Implementation: The below code demonstrates the use of timeout() operator. The below code delays the emission of items by 1 second. But we have added a timeout that throws an exception if there is no emission within 500ms. Hence the below code will throw an error.

Timestamp

This operator attach a timestamp to each item emitted by an Observable. It transforms the items into the Timestamped<T> type, which contains the original items, along with a timestamp for when the event was emitted.

Sample Implementation: The below code demonstrates the use of timestamp() operator.

Output

Timed[time=1534671163644, unit=MILLISECONDS, value=0]
Timed[time=1534671163744, unit=MILLISECONDS, value=1]
Timed[time=1534671163845, unit=MILLISECONDS, value=2]

Using

This operator creates a disposable resource that has the same lifespan as the Observable. The using() operator is a way you can instruct an Observable to create a resource that exists only during the lifespan of the Observable and is disposed of when the Observable terminates.

Sample Implementation: The below code demonstrates the use of using() operator.

Output

onNext: E
onNext: x
onNext: a
onNext: m
onNext: p
onNext: l
onNext: e
Disposable: Example

That’s it guys! This is part five of the series on RxJava. I hope you enjoyed this article and found it useful, if so please hit the Clap button. Let me know your thoughts in the comments section.

Happy coding!

--

--