Skip to content

Instantly share code, notes, and snippets.

@gusbicalho
Last active March 8, 2019 13:03
Show Gist options
  • Save gusbicalho/2e819fa9d95a748fb2793b8e4d58da07 to your computer and use it in GitHub Desktop.
Save gusbicalho/2e819fa9d95a748fb2793b8e4d58da07 to your computer and use it in GitHub Desktop.
Tiny Log project

Goals

Below are things we could implement, in descending priority.

Core

  • Each process should be part of a single cluster of processes
  • Each cluster should handle a single log
  • Processes should be lightweight
  • Adding and removing members from a cluster should be easy and foolproof
  • No need for a process to be able to change cluster membership; killing and starting a new one should be fine
  • Consuming is simple: consumer can poll from an offset (at most n messages since offset x) or poll the latest message.

Consumer management layer

  • Separate from Core cluster
  • Knows list of available logs (each one represented by a Core cluster)
  • Accepts connections from consumers, many of which can be in the same group
  • Keeps state about what offset of each log was latest read by a consumer group (maybe using a Core log)

Instantiation management layer

  • Automate creation, monitoring and changes to Core logs, and keeps Consumer Management layer updated about this
  • Maybe manage a Kubernetes cluster?

ZooKeeper Recipes and Solutions:

Distributed Consensus Reloaded: Apache ZooKeeper and Replication in Apache Kafka

https://www.confluent.io/blog/distributed-consensus-reloaded-apache-zookeeper-and-replication-in-kafka/

The ISR changes dynamically, and every time it changes, the new membership of the set is persisted to ZooKeeper. There are two important purposes that the ISR serves. First, all records written to a partition need to be acknowledged by this set before the leader declares them committed. Consequently, the ISR set must contain at least f + 1 replicas to be able to tolerate f crashes, and the value desired for f + 1 is set by configuration. Second, since the ISR has all the previously committed messages for a partition, to preserve consistency, the new leader must come from the latest ISR. Electing a leader from the latest ISR is important to guarantee that no committed message is lost during the leader transition.

There is also an important difference between Kafka’s replication protocol and ZooKeeper’s replication protocol with respect to persistence. Because Kafka relies on ZooKeeper to be the “source of truth” for metadata, ZooKeeper must provide strong persistence guarantees. As such, it does not acknowledge a write request until it has synced data to the disks of a quorum of ZooKeeper servers. Because of the volume of data that Kafka brokers need to handle, they cannot afford to do the same and they do not sync partition data to disk. Messages of a partition are written to the corresponding file, but there is no call to fsync/fdatasync, which means that the data stays in the operating system page cache after the write, and is not necessarily flushed to disk media. This design choice has a huge positive impact on performance, but has the side effect that a recovering replica might not have some messages that it previously acknowledged.

For a write to be committed, all replicas in the ISR have to respond with an acknowledgement, not just any majority. Different from classic quorum systems, the size of the ISR is decoupled from the size of the replica set, which gives more flexibility into the configuration of the replica set. For example, we can have 11 replicas with a minimum ISR size of 3 (f = 2). With majority quorums, having 11 replicas implies quorums of size 6 necessarily.

Though the ISR scheme is more scalable and tolerates more failures, it is also more sensitive to the performance of a certain subset (ISR) of the replicas. When a majority quorum based scheme would’ve merely ignored the slowest replica, this scheme will pause all writes to the partition until the slowest replica is removed from the ISR, if it was part of it. In most failure modes, replicas are removed quickly. For soft failures, unresponsive replicas are removed after a certain timeout. Similarly, slow replicas are removed if they fall sufficiently behind the leader, as defined by a configuration.

In the Kafka replication protocol, consensus is disguised in the ISR update protocol. Because ZooKeeper ultimately exposes an atomic broadcast primitive, by storing the ISR information in ZooKeeper, one is essentially guaranteed agreement on the succession of ISR changes. When a replica recovers after a crash, it can go to ZooKeeper and find what the latest partition metadata (leader, ISR) is and synchronize accordingly to obtain the latest committed messages. Because all replicas agree (via ZooKeeper) on what the latest ISR is, there is no possibility of split brain scenarios.

Interestingly, other systems have opted for replicating logs directly on a replicated state machine implementation using Paxos [12]. This is, in fact, an example of a scenario in which ZooKeeper is clearly not a good choice. Storing logs directly implies a large volume of writes and a large amount of data (ZooKeeper servers store data in memory and the service uses quorum replication for writes), so it made sense to develop systems like Kafka and BookKeeper for log replication on top of ZooKeeper. Both styles of design have worked well in practice, but we are clearly biased towards the use of a system like ZooKeeper because it enables both the design and implementation of more flexible schemes (e.g., for replication) by exposing agreement through a file-system like API, and a clear separation between data and metadata.

[12] https://ai.google/research/pubs/pub36971

Other articles

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment