Flow has buffer(capacity: Int)
operator to adapt fast consumers to slow producers.
It does offer some flexibility and supports buffer(Channel.CONFLATED)
,
but it does not cover one important use-case that happens in practice.
Consider a flow of UI events (like keyboard/mouse clicks).
If they go too fast and the application is not keeping up with their processing
it would be better to skip the events altogether at some point than to buffer
them indefinitely and process later. When to drop/skip?
There might be multiple strategies based on time-limit or buffer-size.
It seems pretty natural to support at least some of those strategies directly in buffer
operator.
We can add an optional whenFull: BufferStrategy
parameter to the buffer
operator with the following values:
DEFAULT
-- when the buffer is full the producer is suspended, events are never lost, with the exception of some special capacities (see below).DROP_OLDEST
-- drop the oldest value from the buffer.DROP_LATEST
-- drop the most recent (incoming) value.
The downside of this addition is that whenFull
parameter is not orthogonal to the capacity
parameter:
- For
buffer(UNLIMITED)
the value ofwhenFull
is irrelevant since an unlimited buffer is never full, so the strategy is never needed. We should ignorewhenFull
in this case. - For
buffer(CONFLATED)
the value ofwhenFull
is irrelevant as well, but there the definition of "full" is somewhat counterintuitive and the whole operator already behaves more likeDROP_OLDEST
with a twist that the most recent value is kept indefinitely.
We should throw exception ifDROP_LATEST
is specififed, since it does not make sense at all. - For
buffer(RENDEZVOUS)
the strategy ofDROP_OLDEST
cannot be implemented in a meaningful way, since there is no buffer, so we should throw an exception on this combination.
With this change, the use-case of UI event clicks might be reasonably solved with
buffer(1, BufferStrategy.DROP_OLDEST)
, but its behavior would be roughly equivalent to
buffer(CONFLATED)
.
This strategy will have to be actually implemented in channels, so Channel(...)
constructor function will add the corresponding whenFull: BufferStrategy
parameter, too,
as well as ArrayChannel
constructor (ArrayChannel
will contain the bulk of implementation).