Fueled Reactive apps with Asynchronous Flow — Part 5 — View Delegate Implementation

(This article was featured at Android #420 & Kotlin #204 Weekly)
The View
is such an interesting and challenging artefact to deal with. We can be very tempted as Android engineers to create a custom View
, Fragment
, Activity
, etc and probably add inside extra (not needed) logic.
I truly believe moving logic away from the view would make our code more testable. Over the years I’ve learned that the more you can decouple logic and responsibilities out of the view, the easier to test your code could become.
A good design pattern for that concrete purpose is the Delegate pattern (of course there are more for us to take advantage of). Meaning a collaborator is directly supporting the view to handle the work that shouldn’t have to do.
Whether you want to recap what my Migration Strategy was, please have a look at Part 1 for further details:
Throughout this article, I will elaborate on some Implementation details from the View. Concretely I will explain in detail the “View Delegate Design”. Sub-sections are: “Creating a ViewDelegate with RxJava”, “Creating a ViewDelegate with Channels”, “Starting the query” & “Clean up Resources”. Moreover, any possible last updates that we would need to fix to complete our Migration into Kotlin Coroutines at the “Complete Migration” last sub-section.
Previously we probably completed a similar approach to the Clean Architecture migration:
View Delegate Design

Originally we came from a Java View
implementation using RxJava mechanisms such as PublishSubject
or BehaviourSubject
to open synchronous streams.
If you need to refresh differences between Synchronous and Asynchronous communications (streams) I recommend you to read Part 2:

In the diagram above, we can observe both, the View Delegate and the View Listener. Decoupling is a foundational task we need in order to keep other elements using Java at the same time than Kotlin into others. Entities created will become smaller and we will take advantage of this approach to add Coroutines into new Kotlin files. Remember we cannot add Coroutines into Java files, therefore this is especially relevant if we wish we could integrate Kotlin Coroutines.
Now, let’s take a look at How it would look like after the last Migration:

The diagram above highlights where we could potentially use Kotlin Coroutines or Flows. Along the way, I explained the reasons for that in each past article.
Creating a ViewDelegate with RxJava ✅
The anatomy of the SearchViewDelegate
transformed into Kotlin looks like:
Here, we are just passing the SearchTweetPresenter
for us to start the user’s query once is typed on the SearchView
(keep reading for further details about this special kind of View
).
Just briefly, I’d like to remind I won’t change any Presenters
at all, they used to look like this in Java:
They have an @ActivityScope
and use SearchTweetUseCase
under the hood to execute each search query. removeView()
will be called at the latest steps of this article, please keep reading.
I also used the following next View
interface from the SearchTweetPresenter
, where show/hide actions are defined:
Side note: These or any other actions could be defined into the View
interface depending on your needs. In the same way Presenters
could have other approaches like using StateFlow
or LiveData
to automatically update UI without the need for Callbacks on Android. These are just examples intentionally used for the Use case of my sample app during the “Fueled Reactive apps with Asynchronous Flow” presentation.
Now let’s prepare the SearchViewDelegate
and its SearchViewListener
.
Looking into the SearchViewDelegate
, we can create a subtype of Subject
like BehaviourSubject
whose usage will be throughout the SearchViewListener
.
Therefore this listener will be associated with the View
using the setter setOnQueryTextListener
.
Our SearchView
needs to set up its OnQueryTextListener
in order to be able to communicate any new typed query by the user. For every iteration of onQueryTextChange
our subject is passing a new stream with the user’s input query by means of subject.onNext()
.
Now let’s change this into a Coroutines with Channels approach.
Creating a ViewDelegate with Channels ✅
The previously mentioned BehaviourSubject
would need to become a Channel
to synchronously communicate real-time events. This Channels
ability is due to conceptually they are working like a BlockingQueue
.
The prepareViewDelegateListener()
looks really similar to the previous RxJava approach. The main difference is, by using channelFlow{}
builder instead, a channel is passed through the SearchViewListener
. That channel is a SendChannel
type, which is created and provided to the builder block via ProducerScope
. I will get back to the // clean up
comment later.
Important note to remark from the channelFlow
documentation:
The resulting flow is cold, which means that block is called every time a terminal operator is applied to the resulting flow.
Now, at the SearchViewListener
we received SendChannel
from the channelFlow{}
builder block. This is equivalent to the previous implementation, to transfer the new stream we just need to invoke channel.offer()
. At the moment of this presentation SendChannel
needed an @ExperimentalCoroutinesApi
annotation on our class constructor or method signature.
Starting the query ✅
Once we have created all means of passing different events from the view, we need to handle them accordingly to start searching for a query.
First, let’s go back to the RxJava implementation.
- All changes that are coming from the view need to be observed on the UI thread.
- Whether (optional) we do
debounce()
every 400 milliseconds (ms). Those “ms” are really up to you, however, this operation needs to be performed at the computation thread pool -off the main thread-. - With
distinctUntilChanged()
we are making sure to don’t act if there aren’t new changes in the query. - We probably want to
filter{}
the execution of the query under certain scenarios, let’s say we don’t want to enable an initial query for just a#
or@
. This is just an example, if you are happy without filtering, just drop that line from here. - Finally with
subscribe()
on the happy path, we would search for tweets for a particular query:searchTweet(query)
. - Or maybe if any exception or weird edge case happens we will show an error message with
showProblemHappened()
with a niceDialog
,Toast
,SnackBar
, etc.
Let’s have a look at the same code with Kotlin Coroutines:
- I used here
launch{}
to start the stream on the main thread with aSupervisorJob()
(if you wonder why I usedSupervisorJob
, Part 2 gives you my reasons). debounce()
implicitly uses ms by default. We need@ExperimentalCoroutinesApi
annotation, however, to usedebounce()
a@FlowPreview
was also needed by the time of this presentation.distinctUntilChanged()
&filter{}
: No real difference here, same operations than RxJava implementation.- Now I use
flowOn()
with the computation dispatcher because it goes upstream (operators above it) and affectsdebounce
,distinctUntilChanged
&filter
. - After those, we will use
catch
andcollect
. They will be executed on theDispatcher
of the calling scope (uiDispatcher
, our main thread).
catch{}
captures any upstream exceptions coming from previous operations too.
- Finally the
collect{}
terminal operator completes the currentFlow
and executespresenter.searchTweet(query)
with our user’s input query. - If an exception happens in the provided flow, the
catch
block will be called. However, if it happens duringcollect
, it’ll be propagated to thescope
. To avoid side effects, we might want to surround that code withtry
/catch
.
SearchPresenter
is a Java file, for that same reason our SearchViewDelegate
’s scope will end up here. Meaning we need to start any new scopes in other areas using Kotlin Coroutines as necessary.
Another possibility to avoid using the launch{}
block:
This method is equivalent to the previous one, now onEach
replaces what collect
does for each stream. Furthermore, we can launch our scope by means of launchIn
. The main difference from this to the previous version is: any errors coming from collect
weren’t previously caught and now the errors coming from onEach
will be. Since catch
works upstream it will catch them all.
As an extra, I added on purpose right after filter{}
the .map{}
function to trim()
our input query, of course, this is optional.
Clean up Resources ✅
For the RxJava implementation, we would manually invoke cleanResources()
to avoid memory leaks. Within that function, we clean up that listener as well as we dispose()
the previously created Disposable
(during observeSubject()
execution).
Let’s see how it looks like by applying a Channels
approach.
Going back to the prepareViewDelegateListener()
, we have a very convenient block called awaitClose{}
right inside channelFlow{}
. awaitClose
will suspend the coroutine of this channelFlow
until the listener goes away. Only then, this will resume and will clean up those resources. By adding any necessary statements inside that block, it will prevent possible memory leaks. Note we don’t have to clean up manually our channel
instance because channelFlow{}
will take care of it for us.
How does the clean-up look like for the CoroutineScope
?
Like we can see, our SearchViewDelegate
makes sure to end up with the CoroutineScope
we used (main thread for our views) by calling cancel()
as well as it tells the SearchPresenter
about removing any listeners from the view via removeView()
.
Finally, let’s clean up all resources when our view is destroyed.
When SearchTweetActivity
(our main view) is being destroyed, it invokes cleanUp()
method from the SearchViewDelegate
(previously explained) and makes sure all views are garbage collected by removing all hard references (nullifying them).
Are we finally done now?
Complete Migration ✅
I think the last but not least important thing is optimising resources and cleaning up the TaskThreadingImpl
. For this, we probably can remove the previous declared Schedulers
if we don’t need it anymore.
Now that we can get rid of the those Schedulers
and their getter methods for good, we could directly use the native Kotlin Dispatchers
. Unless of course, you still need our previous Schedulers
, in addition to their thread execution pools. This could be the case whether we are reusing them for any concrete library, then this step can be skipped. The way to directly convert back from Dispatchers
to Schedulers
is still pending.
That was all I wanted to cover for Part 5, so far we have reviewed the last part about the View layer Implementation sub-section:
- View Delegate Implementation
If you liked this article, clap and share it, please!
Cheers!
Raul Hernandez Lopez
I want to give a special thanks to Manuel Vivo (follow him) for reviewing this article and to make it more readable, follow him! He knows a ton about Kotlin or Android.
A big & special thanks to Cristina P. for the proof-reading of this article.
(Update) The newest article talking about “Synchronous communication with the UI using StateFlow
” (aka how to get rid of the Callbacks):
To follow up, I will complete this series of articles with:
- “Lessons learned & Next steps”. The end chapter with some reflections and personal opinions as well as closing comparative notes.
Do you want to recapitulate any previous topics? A previous Part not mentioned during this article is “Data layer Implementation”: