Part 3 — Transmitting Streams of values with Channels
So far in this series, we’ve examined Kotlin coroutine fundamentals in Part 1 and in Part 2 we saw how to cancel coroutines and structured concurrency. In this third part, we will be looking at how a stream of items can be transmitted between coroutines with channels.
With Deferred
, we can fetch a single value from a coroutine to another. Channels on the other hand provide a pipeline to transmit a stream of values between coroutines. They essentially act as a pipe that can be used to send and receive a stream of values. To be able to do this, Channel
implements two interfaces: SendChannel
and ReceiveChannel
. Both interfaces are shown below with some of their most important functions.
The send
is a suspending function that suspends if there is no receiver on the other end to receive the sent item. Also the receive
function from the receiver also suspends if no item has been sent through the Channel
. This behaviour is similar to that of a BlockingQueue
except that the sender and receiver function suspends instead of blocking.
A channel can be created using the Channel
factory function. An optional integer parameter can also be passed as a buffer size.
The buffer size parameter determines the number of items that can be sent before the channel suspends. The default value for the buffer is 0 and this creates a RendezvousChannel
. Asides passing an explicit integer value as the buffer size, you can also pass one of the channel defined constants
- Channel.UNLIMITED = Int.MAX_VALUE
- Channel.RENDEZVOUS = 0
- Channel.CONFLATED = -1
Passing Channel.UNLIMITED
creates a LinkedListChannel
which is a channel with an unlimited buffer size.
Channel.CONFLATED
creates a ConflatedChannel
which always buffers the most recent item sent to the channel and emits this to the receiver. This way, the receiver always get the most recently sent item.
A RendezvousChannel
has no buffer and only transfers emitted item to the receiver when both the sender and receiver meet, that is the receiver is ready to receive as at the time the sender sends an item. Else, the sender suspends immediately until there is a receiver.
Sending an item to a channel is done by calling the send
suspending function. A single item can be consumed from the channel by calling receive
or iterating through the channel with a simple for..loop to consume each item as they are emitted.
Alternatively, we can call the consume
or consumeEach
extension function on ReceiveChannel
to consume each item as they are emitted by the channel.
Calling close
closes the channel and terminates the stream. Closing the channel simply sends a special signal to the channel that indicates that no further items will be sent. All items sent to the channel before calling close
are guaranteed to be sent to the receiver.
A channel represents a hot stream of data that emits items without the presence or subscription from a receiver. Once an item is consumed from a channel, the same item cannot be re-consumed by another receiver. Generally, when a channel is consumed, it does not remit the items to another receiver that tries to consume from the channel subsequently. It is unlike an Observable
in RxJava
that emits a stream of items to each subscriber of the observable. This is illustrated with an example below:
In the snippet above the first invocation of consumeEach
consumes all the items from the channel and subsequent attempt to consume items from this channel yields nothing. It is planned that in future release of coroutines, calling consumeEach
on a channel that has been consumed already will throw an IllegalStateException
.
Sending Items to multiple listeners
With the basic channel, if we have multiple receivers waiting to receive items from the channel, the emitted items will be shared across all receivers. They wont get all items individually. To mitigate this, the library provides BroadcastChannel
and ConflatedBroadcastChannel
.
Both BroadcastChannel
and ConflatedBroadcastChannel
have the capability of sending items to multiple listeners but ConflatedBroadcastChannel
only emits the most recently sent item in a situation where the receiver consumes items slower. Also future subscribers to this channel will receive the item recently emitted. This is similar to how a BehaviorSubject
works in Rx
. This is shown in the sample below:
In the above example, the first coroutine delays for 500ms, the second for 300ms and they both only get the most recent item sent to the channel after the delay. If this was implemented with a BroadcastChannel
, both coroutines would have received all emitted items, the integers 1 to 10. The coroutine that subscribed later after all items have been sent to the channel only received the last item sent to the channel.
Unlike an ordinary channel, the send operation on a BroadcastChannel
does not suspend if there are no receivers, while send on ConflatedBroadcastChannel
never suspends at all. For example the code below runs and ends successfully without suspending.
Producer
To produce items from a single point for receivers to consume, the kotlinx.coroutines
library provides the produce
coroutine builder that returns a ReceiveChannel
to the caller. The builder takes a suspending lambda as a parameter and only the code within this lambda can send items to the channel.
The suspending lambda is an extension on ProducerScope
which wraps a SendChannel
that is used to send items to the channel. Clients can iterate through the returned ReceiveChannel
to consume the sent items. The channel is closed when the coroutine completes.
Actor
An actor is like a reverse of a producer, it allows items to be sent from multiple places but processed from one end just like a mailbox. kotlinx.coroutines
provides the actor
coroutine builder that returns a SendChannel
to its caller. The retuned SendChannel
can then be used to send items to the coroutine.
The suspending function passed as a parameter to process the items sent to the channel is an extension on ActorScope
. ActorScope
wraps a ReceiveChannel
that can be iterated to consume items sent to the channel. The channel is closed when the coroutine completes.
The actor
coroutine builder is currently marked with an ObsoleteCoroutineApi
annotation to indicate that it will be deprecated in the future.
It’s important to note that the channel APIs are still experimental and subject to future changes. Abstraction for cold streams with channel is also in the works and will affect a couple of the current APIs when implemented. You can follow the progress of that here: https://github.com/Kotlin/kotlinx.coroutines/issues/254
In the next part we will look at how coroutines interoperate with reactive streams.
Thanks for reading.