Implement Kotlin Flow race
/amb
operator
Hello Kotlin Developers, let's implement race
/amb
operator for Flow. This operator is similar to race
of rxjs
or amb
of RxJava
. You can check the documents below for more details.
race
is used when we have multiple Flow
s, we collect
all of them, as soon as one of Flow
emits a value
/error
/complete
, it becomes the “winner”, others are canceled and the result Flow
will forward all events (including error
and complete
events) from the “winner”.

https://rxjs.dev/api/index/function/race
Returns an observable that mirrors the first source observable to emit an item.
https://reactivex.io/documentation/operators/amb.html
When you pass a number of source Observables to Amb, it will pass through the emissions and notifications of exactly one of these Observables: the first one that sends a notification to Amb, either by emitting an item or sending an
onError
oronCompleted
notification. Amb will ignore and discard the emissions and notifications of all of the other source Observables.
Use and example
This operator is useful when you have multiple resources that provide values, for example, API resources but due to network conditions, the latency is unpredictable and varies significantly. In this case, we want to get the “fastest” value and ignore the “slower” values.
Here is an example:
Only values from the second Flow
are emitted since it starts emitting first.
Implementation
race
works in the following way:
- Collect to all source
Flow
s - When a new event arrives from a source
Flow
, pass it down to a collector. - Cancel all other
Flow
s. - Forward all events from the winner
Flow
.
Here is the implementation of race
By using “select expression”, we can select the first event that becomes available from Channel
s (https://kotlinlang.org/docs/select-expression.html#selecting-from-channels).
Bonus, we can add a variant function that accepts variable arguments and raceWith
as an extension function on Flow
Conclusion
We have already implemented race
operator. Using Channel
and “select expression” make the backpressure and concurrency problem a lot easier.
You can find full implementation and others Flow
operators here https://github.com/hoc081098/FlowExt, with the entire source code, the documentation, and setup info to add the library to your project.
FlowExt is a Kotlin Multiplatform library, that provides many operators and extensions to Kotlin Coroutines Flow.
concat
interval
neverFlow
race
amb
range
timer
bufferCount
concatWith
startWith
flatMapFirst
exhaustMap
flattenFirst
exhaustAll
mapIndexed
mapTo
mapToUnit
materialize
dematerialize
raceWith
ambWith
retryWhenWithDelayStrategy
retryWhenWithExponentialBackoff
retryWithExponentialBackoff
takeUntil
throttle
throttleTime
withLatestFrom
- …
Thanks for reading ❤. If you like my article, please follow me on Medium, Github and Twitter.