Fueled Reactive apps with Asynchronous Flow — Part 4 — Use Case layer Implementation

(This article was featured at Android #419 & Kotlin #203 Weekly)
Once upon a time, I recall creating business logic into the View
was “all right”. For instance, not too many years ago most of the Android apps were built on top of a collaborator called Controller. Models contained a variety of mutable states and Controller had direct communication with the View. Does it sound familiar? Model-View-Controller (MVC) was a very popular “architecture” pattern at the time, not only for mobile apps but also on the web where it became popular.
I believe Clean Architecture started like a remedy to relieve centralised logic handling. There, like a specialisation is where the Use Case or Interactor truly brights. The Use Case shouldn’t care about where the data comes from, it just needs an input, it knows how to transform that into the expected output, and communicates back that result to the Presentation layer in some way.
Back in Part 1, you can find further details about this planned Migration strategy:
Throughout this, I will elaborate on the Implementation details from the Business layer, concretely I will explain in detail the “Use Case Design” and any possible updates we would need for lower-level layers when the Business layer migration is completed.
Use Case Design

Ideally, the previous Data layer Implementation is completed. Repository combined with Data Sources both are using Kotlin Flow
and suspend
functions like we did on:
Now, it’s time for the Use Case to collect results and maybe applying any business logic.
The Clean Architecture side migration might look like this once we are done:

Now we are relying on the View’s lifecycle.
Typically in Android, we use an @ActivityScope
. The name here is really up to you, some people prefer ViewScope
or PerActivityScope
. Whether you need a more grained view lifecycle, a FragmentScope
is another possibility. Or even a @RetainedScope
for covering configuration changes (CC), I won’t cover CC during this or next articles since properly handling CC is an advanced topic, surely callbacks won’t be enough for it.
Let’s look at SearchTweetUseCase
anatomy:
TweetsRepository
, where the data is coming from.TaskThreading
, where we define execution thread pools.
For this approach, originally, my Use Cases were inheriting from a generic UseCase
with a callback parameter, the reason is right inside the execute()
method, whose purpose is precisely starting to execute the Use Case.
RxJava uses Disposable
to take care of the lifecycle, for further details about lifecycle differences please read Part 2:
execute()
receives a query coming from the Presentation layer as well as the way back to pass the final output by means of a SearchCallback
.
Side note: nowadays there are more modern techniques to avoid using callbacks, however, my intention on this and next posts is migrating all collaborators but the Presenter, and keeping that one in Java. Therefore I won’t discuss any further about using more convenient ways to refresh UI such as StateFlow or LiveData.
The RxJava previous workflow is as follows:
- the view shows the loader by means of
callback.onShowLoader()
. This action could go into our Presenter instead of the Use Case since it’s view logic. subscribeOn()
defines that any business logic will be performed in the computation thread pool, without blocking the main thread.observeOn()
tells that the final result will pass through the main thread.subscribe()
defines both the happy pathonSuccess()
and the error pathonError()
.
Before subscribe()
is where, whether we would need a transformation or some specific business logic, we would introduce map()
. A collaborator would perform inside that function the specific operation for the business logic.
Let’s start the migration to Coroutines:
At the moment of the presentation, this required an @ExperimentalCoroutineApi
annotation. The scope where the coroutine runs is defined in CoroutineScope
, this starts the execution of the coroutine by using launch {}
. To handle errors Flow
provides a very convenient catch {}
operator. By using that, we avoid having to introduce this code try {} catch(e: Exception) {}
. Finally, to collect stream results from a Flow
we will use collect {}
.
At the code above I didn’t use flowOn()
to switch contexts on purpose. This is perfectly valid if the repository defined it already. It could be particularly useful in the case of having the previously mentioned situation, using map{}
and executing a calculation, then we would need to switch the IO Dispatcher
to become Default (computation thread pool).
Do we have any other way to write the previous code?
onEach{}
would do the equivalent to the collect{}
terminal operator. To finish launchIn()
defines the scope and launches the given flow into that scope.
Now that we defined the upper layer and the parent coroutine scope we can think about the advantages of Structured Concurrency.
How to cancel for a Coroutine Scope?
Just invoking cancel()
we will cancel our scope and all other children, cleaning all resources.
How to cancel a Disposable?
Now let’s see the previous implementation done in Java:
It’s really similar, we would just dispose()
the previous defined Disposable
, and that’s it.
Is that everything though? There is a subtle difference here, this just disposes this disposable contained within this UseCase, meaning that this same action should be performed in embedded disposables from other layers, they need to be disposed to avoid memory leaks as well.
Here, there is a clear advantage from Coroutines scopes using Structured Concurrency over RxJava Disposables.
Now, let’s say we want to reuse our RxJava Schedulers
during the whole migration by using them into Dispatchers
, would that be possible?
YES!
Re-using Schedulers like Dispatchers
This is really great because let’s say you are using those same Schedulers
to make threading pools on different mechanisms or libraries having all of them centralised. Under those circumstances, we can easily reuse them, of course using them for Coroutines by means of our Dispatchers
. This is thanks again to the kotlinx-coroutines-rx2 library.
With this we are done with the Clean Architecture side, isn’t it?
Well, bad news, not really.
Update code after full Clean migration
We still need to change the return type of our Repository.
We wouldn’t need any more asObserver()
to return an Observable
from the repo, because now all areas can work with the Flow
type.
Another interesting possibility would be whether we had the migration started from the Repository onwards:
.await()
transforms a given Single
(coming from the NetworkDataSourceImpl
) into a non-blocking suspend
function that is interpreted into this functionality. await
is another handy operator from kotlinx-coroutines-rx2.
So far, we have done a full migration of the Clean Architecture side, removing Cold observables.
Next part is to remove pending ones or what we called Hot observables.
This is all for Part 4, so far we have reviewed the business logic layer of the Implementation sub-section:
- Use Case layer Implementation
If you liked this article, clap and share it, please!
Cheers!
Raul Hernandez Lopez
I want to give a special thanks to Carlos Mota for reviewing this article and to make it more readable, he knows a ton about Kotlin Multiplatform, follow him! Thanks to Jorge Castillo for post-reviewing it!
(Update) The newest article talking about “Synchronous communication with the UI using StateFlow
" (aka how to get rid of the Callbacks):
To follow up, the Implementation cycle will be completed with the next Implementation sub-section:
- “View Delegate Implementation”. How-to.
And finally:
- “Lessons learned & Next steps”. The end chapter with some reflections and personal opinions as well as closing comparative notes.