Skip to content

Instantly share code, notes, and snippets.

@elizarov
Created January 24, 2020 09:45
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save elizarov/b10e19f7699d242c6c777405ddfdc509 to your computer and use it in GitHub Desktop.
Save elizarov/b10e19f7699d242c6c777405ddfdc509 to your computer and use it in GitHub Desktop.
RFC: Flow: Optional backpressure-handling strategy in buffer operator

Flow has buffer(capacity: Int) operator to adapt fast consumers to slow producers. It does offer some flexibility and supports buffer(Channel.CONFLATED), but it does not cover one important use-case that happens in practice. Consider a flow of UI events (like keyboard/mouse clicks). If they go too fast and the application is not keeping up with their processing it would be better to skip the events altogether at some point than to buffer them indefinitely and process later. When to drop/skip? There might be multiple strategies based on time-limit or buffer-size. It seems pretty natural to support at least some of those strategies directly in buffer operator.

We can add an optional whenFull: BufferStrategy parameter to the buffer operator with the following values:

  • DEFAULT-- when the buffer is full the producer is suspended, events are never lost, with the exception of some special capacities (see below).
  • DROP_OLDEST -- drop the oldest value from the buffer.
  • DROP_LATEST -- drop the most recent (incoming) value.

The downside of this addition is that whenFull parameter is not orthogonal to the capacity parameter:

  • For buffer(UNLIMITED) the value of whenFull is irrelevant since an unlimited buffer is never full, so the strategy is never needed. We should ignore whenFull in this case.
  • For buffer(CONFLATED) the value of whenFull is irrelevant as well, but there the definition of "full" is somewhat counterintuitive and the whole operator already behaves more like DROP_OLDEST with a twist that the most recent value is kept indefinitely.
    We should throw exception if DROP_LATEST is specififed, since it does not make sense at all.
  • For buffer(RENDEZVOUS) the strategy of DROP_OLDEST cannot be implemented in a meaningful way, since there is no buffer, so we should throw an exception on this combination.

With this change, the use-case of UI event clicks might be reasonably solved with buffer(1, BufferStrategy.DROP_OLDEST), but its behavior would be roughly equivalent to buffer(CONFLATED).

This strategy will have to be actually implemented in channels, so Channel(...) constructor function will add the corresponding whenFull: BufferStrategy parameter, too, as well as ArrayChannel constructor (ArrayChannel will contain the bulk of implementation).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment