Skip to content

Instantly share code, notes, and snippets.

@aljoscha
Last active July 19, 2021 14:22
Show Gist options
  • Save aljoscha/9d134bbec1b5fceacf8a2da791a44aef to your computer and use it in GitHub Desktop.
Save aljoscha/9d134bbec1b5fceacf8a2da791a44aef to your computer and use it in GitHub Desktop.
Problematic Timestamping Situations in the Kafka Sink

Problematic Timestamping Situations in the Kafka Sink

The Kafka sink doesn't handle the case where all data have one and the same timestamp well.

When does all data have the same timestamp?

  • When writing out an existing arrangement, where timestamps are compacted (the default).

Symptoms/Reasons (as far as I can tell)

With the current sink implementation

  1. The sink stashes data with timestamp t until the frontier is t+1. We do this to a) write complete timestamps as one transaction, b) write data in strictly advancing timestamp order.
  2. The sink stashes data with timestamp t until all data with timestamp t has been written and committed. We do this to be able to retry writing/committing in case of failure.

We can get around 1. (the ordering at least) by simply relaxing the ordering requirement.

We can can get around 2. by restarting the complete sink instead of of stashing records for the case where we need to re-attempt a transaction. This is potentially quite a bit more costly than just stashing and re-writing the latest batch of records. Stashing records until we have successfully committed them is mostly a problem when a large number of records have the same timestamp. Then again, when sinking a materialized view, the data we're writing is just sitting there already. Restarting the whole sink seems to only be a problem when writing straight from a source without a materialized view in between.

We don't currently have the infrasturcture in place that would allow killing/restarting a sink dataflow.

It seems we could relieve some of the problems by not allowing users to sink straight from sources but instead always require sinking a materialized view. That seems like a product decision.

Solution Sketch

I think the only viable solution is to not stash updates in the sink but instead stream them straight through. For this to work we need two things:

  • Relaxed ordering requirements (ordering can be factored out into an optional operator that can be requested via config). (see 1. above)
  • Don't rely on stashing to be able to retry failed transactions (see 2. above). We need to teach materialize to restart the sink dataflow on failure, instead of the sink stashing and retrying internally.

Optional, don't read!

Why can't we let the sink write from more than one worker?

All sink writing and stashing happens on only one worker. We do this to a) ensure global timestamp order and b) provide exactly-once guarantees. As mentioned above, we can relax a) but b) is harder to fix.

The sink needs to write everything from one worker because we need to write the data along with a progress record that marks the time up to which we have written as one Kafka Transaction.

Why isn't this a problem for Kafka Streams and Flink?

Kafka Streams can write out the data and update the consumer read offset as one transaction. Each parallel worker does that independently. When a failure happens, the worker just needs to pick up from the latest read offset. We can't do that because we always read all the data again on restart and cannot record read offsets per worker.

Flink also records read offsets and does a distributed two-phase commit for Kafka Sinks.

Getting this approach to work seems very desirable. Flattening the differential updates from an existing view into individual records and shipping them all to one worker is not very efficient and is reducing the potential for distributed processing.

I think that approach wouldn't work with the current envelopes (UPSERT and DEBEZIUM) and exactly-once guarantees, unless we also introduce a two-phase commit protocol.

That approach would work well when we want to write CDCv2 output.

@elindsey
Copy link

One other thing, mentioned only for completeness: an idea that was discussed early on was processing timestamps in order, but streaming out the data for a given timestamp as it was received. On bootstrap, the sink would rehydrate the data from the latest partially written timestamp to know what entries were already written and what entries still needed to be written. Ignoring issues around consistency, I think it would suffer from the same problem as today except in reverse - if we streamed out 99% of a large timestamp, then crashed and restarted we would attempt to retain that 99% of data in memory to determine what new data should be sent.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment