Fueled Reactive apps with Asynchronous Flow — Part 2 — Asynchronous communication: Streams & Basics

(This article was featured at Android #418 & Kotlin #201 Weekly)
Refactoring an application to use modern techniques and libraries can be really challenging. This post is the second part where I explain how I started to learn about the concepts that are a must-do for it.
Since the presentation had a limited time to speak about some topics, they were covered quickly sometimes, but here I want to tell in-depth or at least trying about a few subtle details.
Along this part, I will compare RxJava and Coroutines to get a better understanding of all important concepts. I will introduce the “Asynchronous” concept Basics such as “One-Shot Operations”, “Streams: Hot and Cold”, “Threading”, “Lifecycle” as well as “Structured Concurrency”. Please read Part 1 if you haven’t yet for further context:
Asynchronous communication: Streams
What did I mean by Asynchronous communication?
First, let’s have a look at some definitions found here:
Asynchronous
(of a computer or other electrical machine) having each operation started only after the preceding operation is completed.
Computers, Telecommunications. of or relating to operation without the use of fixed time intervals (opposed to synchronous).
An Asynchronous process is a process that would happen sometime in the future but we do not have certainty about when it would really happen. That means a long-running operation or an operation in the background while the application is doing something else.
When trying to explain Streams, I like to think about them as Conversations.

In this situation I am Raul and I would like to send a “good morning 🖖”. If we think about this in terms of a network request, I can embed parameters like the kind of greeting, and the greeting message itself. At this point, I am the sender.
Then let’s see what happens on the other side, the receiver.

Here Cristina has replied to me by first acknowledging (ACK) that the message was received.
Next, the receiver is preparing a response to the initial request.

She tells me she is teaching maths to a student online too. Here is were the observable starts the actual stream.
Let’s go back and look at the sender's side.

Once Raul has received the message and acknowledges it, the conversation could start.
But what actually happens is, if the conversation keeps going we have an open stream of 1 to n messages sent and received. Speaking in RxJava terms:
Observer (sender) is expecting at some point in the future a message from the Observable (receiver) and therefore this creates a stream.

Basics
Before going straight to write some code, something that helped me a lot to understand Kotlin Flow was to compare it with RxJava.
Let’s start with operations that emit only one single event:
One-Shot Operations
- When we want to return a not nullable object:
RxJava uses Single<T> while Coroutines uses suspend() -> T
Rx: We return an Object
by means of Single
operator.
C: We return an Object
by means of a suspend
function.
- or if we want it nullable:
RxJava has Maybe<T> while Coroutines suspend() -> T?
Rx: We return an Object
by means of Maybe
.
C: We return a Nullable
Object
by means of a suspend
function.
- or a task needs to be completed but we don’t really need a result back:
RxJava has Completable
while Coroutines suspend() -> Unit
Rx: We return nothing or void
in Java, but the task is completed or finished by means of the Completable
operator.
C: Coroutines has Unit
, which means no object is returned in Kotlin.
The following table summarises this:

Let’s see an example of how to write a suspend function for our messaging application described earlier:
suspend fun sendGreetings(param1: Greeting): Message =
withContext(Dispatchers.IO) {
sendMessageRetrieveMessageBack(param1)
}
suspend functions can only be invoked within another suspend function or a coroutine builder which defines a scope for the coroutine to start running from. withContext
is defining which thread pool (to see differences between RxJava and Coroutines keep reading below) we want to use for that operation, in this case, Default thread or computation.
Looking at the previous function sendMessageRetrieveMessageBack
, this one should be another suspend function which wouldn’t need to define a thread if the dispatcher is the same we want to use for it. sendGreetings
is a suspend function started from a launch
function like this:
data class Greeting(type: TypeGreeting, message: String)val scope = CoroutineScope(Job() + Dispatchers.Main)
val vulcanGreetings = Greeting(TypeGreeting.VULCAN,
"Good morning 🖖")scope.launch {
sendGreetings(vulcanGreetings)
}
For readers who are new to Coroutines, CoroutineScope
might be something unfamiliar.
Like Manuel Vivo defined in the article called Coroutines: First things first:
a CoroutineScope keeps track of any coroutine you create using launch or async (these are extension functions on CoroutineScope)
For this same reason, we can use CoroutineScope right there. For sequential execution, we would use launch
but for parallel execution, we would need to use async
instead. In this and next posts, I will use launch
.
Streams: Hot and Cold
Next we’ll learn about the differences between Hot and Cold streams now that we (hopefully) understand better what a stream looks like (first section of this article).
I really encourage you to read this very interesting article by Roman Elizarov called Cold flows and Hot channels where he accurately explains the differences between cold and hot streams.
To start with Cold streams, the following situation defined by Roman is a perfect example of when we would use Cold ones.
What if we don’t need either concurrency or synchronization, but need just non-blocking streams of data?
Again we go to the term non-blocking or what we mentioned at the beginning of this article: Asynchronous. This is indeed a Cold stream, something we can wait for after a long-running operation like a network request.
At the presentation I mentioned types of Cold streams depending on the framework:

For RxJava we can use Observable but for Kotlin Coroutines we would use Flow
, Flows are Cold.
The ability of Flow
is the fact that supports back-pressure as well. If we need to support back-pressure on RxJava, we would need a more specialised operator called Flowable
. Back-pressure is a very advanced topic, and won’t be covered during this article.

Channels are at duty to synchronise and orchestrate different entities to make sure they communicate at the moment is needed, event streams are a good example of it.
For the particular Use Case mentioned at Part 1 with the sample app, this is a good usage of the situation when the user is typing a query because we straight away need to tell as soon as possible to our presenter that we need to search for a query, then the process starts. For this case, Channels are Hot, and we need them!
Specific examples of cold and hot observables will be included in Part 3 but are not covered during this part.
Threading
In both RxJava and Coroutines, we have the option of controlling the thread which our code will run on.
For RxJava we use Schedulers and for Coroutines we use Dispatchers.
The built-in Dispatchers in Coroutines are:
- IO thread for Input / Output operations.
- Default (where this means Computation thread for RxJava) for CPU required operations.
- Main, special case used for the Main Thread on Android.
To better understand differences amongst the types mentioned above, I encourage you to read this amazing article done by Erik Hellman about “Understanding CPU and I/O bound for asynchronous operations”.
Lifecycle
Lifecycle is probably the most important for us Android developers because this makes a big difference in terms of performance and memory leaks in our code. It is really important to appropriately handle the lifecycle in our applications!
In RxJava we would handle the lifecycle by manually using a Disposable
for every Observable
or Single
, and we would need to manually dispose of every one of them. If we have multiple Disposables in the same context, we can use a CompositeDisposable
to manage them all:
val compositeDisposable = CompositeDisposable()
compositeDisposable.add(sendGreetings(vulcanGreetings))
compositeDisposable.dispose()
However, here we have a big difference with Coroutines, the lifecycle is handled by a CoroutineScope
and its scope can be defined with the Dispatcher
combined with a Job
:
val scope = CoroutineScope(Job() + Dispatchers.Main)
scope.cancel()
If the Job is not defined and it is just the dispatcher passed in, there is no problem because CoroutineContext
does it behind the scenes if we look at the source code of its extension function:
public fun CoroutineScope(context: CoroutineContext): CoroutineScope =
ContextScope(if (context[Job] != null) context else context + Job())
Structured Concurrency
When cancelling a CoroutineScope
, all coroutines started below that will be cancelled.
val scope = CoroutineScope(Job() + Dispatchers.Main)


This has several advantages like Auto-closure of the Flow
and Recursive cleanup: once all suspend functions in the chain have been cancelled, no memory leaks can happen.
However with this use case, if a child is cancelled because of a thrown exception, as a side effect, it would affect other children, cancelling the others too under that Scope. For this reason, there are specialised scopes created by Google, handling more specific lifecycles like viewModelScope or lifecycleScope.
To fix this problem we can also create our own sub-type of scopes by using a SupervisorJob()
like:
val scope = CoroutineScope(SupervisorJob() + Dispatchers.Main)

For further details about cancellation and exception handling, I recommend you to read the series of articles: Exceptions in Coroutines, Cancellation in Coroutines & Coroutine Patterns for Work that Shouldn’t be Cancelled. Big thanks to Florina Muntenescu & Manuel Vivo for all those articles and their talk during the last KotlinConf’19 about Coroutines! Gotta catch ’em all!
This is all for Part 2, we have reviewed the following two sections of the presentation in this article:
- Asynchronous communication: Streams & Basics
If you liked this article, clap and share it, please!
Cheers!
Raul Hernandez Lopez
I want to give a special thanks to Erik Hellman & Manuel Vivo (follow him) for reviewing this article in-depth and to make it more readable. They are very knowledgeable and approachable, follow them.
(Update) The newest article talking about “Synchronous communication with the UI using StateFlow
" (aka how to get rid of the Callbacks):
To follow up with the next sections:
- “Data layer Implementation”. How-to.
- “Use Case layer Implementation”. How-to.
- “View Delegate Implementation”. How-to.
- “Lessons learned & Next steps”. The end chapter with some reflections and personal opinions as well as closing comparative notes.
Previous article:
- Part 1: Use case & Migration strategy. If you need to remember why all this adventure started for me, architecture decisions taken previously or reading this article with the presentation slides support have a look at: