Skip to content

Instantly share code, notes, and snippets.


jrwest/ Secret

Last active Oct 14, 2020
What would you like to do?

Cluster Metadata

The following is intended for those wishing to use the Cluster Metadata subsystem inside of Riak or another riak_core application. It is not intended for those wishing to support, debug or contribute to the subsystem, although it may be helpful.

1. Overview

Cluster Metadata is intended to be used by riak_core applications wishing to work with information stored cluster-wide. It is useful for storing application metadata or any information that needs to be read without blocking on communication over the network.

1.1 Data Model

Cluster Metadata is a key-value store. It treats values as opaque Erlang terms that are fully addressed by their "Full Prefix" and "Key". A Full Prefix is a {atom() | binary(), atom() | binary()}, while a Key is any Erlang term. The first element of the Full Prefix is referred to as the "Prefix" and the second as the "Sub-Prefix".

1.2 Storage

Values are stored on-disk and a full copy is also maintained in-memory. This allows reads to be performed only from memory, while writes are affected in both mediums.

1.3 Consistency

Updates in Cluster Metadata are eventually consistent. Writing a value only requires acknowledgment from a single node and as previously mentioned, reads return values from the local node, only.

1.4 Replication

Updates are replicated to every node in the cluster, including nodes that join the cluster after the update has already reached all nodes in the previous set of members.

2. API

The interface to Cluster Metadata is provided by the riak_core_metadata module. The module's documentation is the official source for information about the API, but some details are re-iterated here.

2.1 Reading and Writing

Reading the local value for a key can be done with the get/2,3 functions. Like most riak_core_metadata functions, the higher arity version takes a set of possible options, while the lower arity function uses the defaults.

Updating a key is done using put/3.4. Performing a put only blocks until the write is affected locally. The broadcast of the update is done asynchronously.

2.1.1 Deleting Keys

Deletion of keys is logical and tombstones are not reaped. delete/2,3 act the same as put/3,4 with respect to blocking and broadcast.

2.2 Iterators

Along with reading individual keys, the API also allows Full Prefixes to be iterated over. Iterators can visit both keys and values. They are not ordered, nor are they read-isolated. However, they do guarantee that each key is seen at most once for the lifetime of an iterator.

See iterator/2 and the itr_* functions.

2.3 Conflict Resolution

Conflict resolution can be done on read or write.

On read, if the conflict is resolved, an option, allow_put, passed to get/3 or iterator/2 controls whether or not the resolved value will be written back to local storage and broadcast asynchronously.

On write, conflicts are resolved by passing a function instead of a new value to put/3,4. The function is passed the list of existing values and can use this and values captured within the closure to produce a new value to store.

2.2 Detecting Changes in Groups of Keys

The prefix_hash/1 function can be polled to determined when groups of keys, by Prefix or Full Prefix, have changed.

3. Common Pitfalls & Other Notes

The following is by no means a complete list of things to keep in mind when developing on top of Cluster Metadata.

3.1 Storage Limitations

Cluster Metadata use dets for on-disk storage. There is a dets table per Full Prefix, which limits the amount of data stored under each Full Prefix to 2GB. This size includes the overhead of information stored alongside values, such as the logical clock and key.

Since a full-copy of the data is kept in-memory, its usage must also be considered.

3.2 Replication Limitations

Cluster Metadata uses disterl for message delivery, like most Erlang applications. Standard caveats and issues with large and/or too frequent messages still apply.

3.3 Last-Write Wins

The default conflict resolution strategy on read is last-write-wins. The usual caveats about the dangers of this method apply.

3.4 "Pathological Eventual Consistency"

The extremely frequent writing back of resolved values after read in an eventually consistent store where acknowledgment is only required from one node for both types of operations can lead to an interesting pathological case where siblings continue to be produce (although the set does not grow unbounded). A very rough exploration of this can be found here.

If a riak_core application is likely to have concurrent writes and wishes to read extremely frequently, e.g. in the Riak request path, it may be advisable to use {allow_put, false} with get/3.

4. Configuration Details

Needs Content

4.1 Data Directories

Needs Content

See riak_Core_metadata_manager:start_link/1.

4.2 Broadcast Timers

Needs Content

Cluster Metadata Internals

The following describes Cluster Metadata from an implementor's perspective. It is intended for those wishing to familiarize themselves with the implementation in order to make changes or to support the subsystem. This document is not intended for those wishing to use Cluster Metadata but may be helpful.

1. Structure

The Cluster Metadata subsystem is divided into several parts spread across multiple modules. These parts can be divided into three categories: representation, storage and broadcast. Unifying them is the API, which presents the subsystem as one cohesive unit to clients.

Cluster Metadata Structure

1.1 Representation

Cluster Metadata stores and represents values internally in a different form than how it is presented to clients. Most notably, clients are not exposed to the logical clocks stored alongside values while internally they are used heavily.

The riak_core_metadata_object module is responsible for defining the internal representation -- referred to as the "Metadata Object". In addition, it provides a set of operations that can be performed on Metadata Object(s). These operations include conflict resolution, reconciling differing versions of an object, and accessing the values, the clock and additional information about the object. Outside this module, the structure of the Metadata Object is opaque and all operations on objects should use it.

1.1.1 Logical Clocks

The logical clocks used by Cluster Metadata are Dotted Version Vectors. The implementation is contained within dvvset.erl.

1.2 Storage

Data is stored both in-memory, using ets, and on-disk, using dets. Each node locally manages its own storage via the riak_core_metadata_manager, or "Metadata Manager", process running on it.

The Metadata Manager is not responsible for replicating data, nor is it aware of other nodes. However, it is aware that writes are causally related by their logical clocks, and it uses this information to make local storage decisions, e.g. when an object is causally older than the value already stored. Data read out of the Metadata Manager is a Metadata Object, while data written is either a Metadata Object or a raw value and a view the logical clock (the view contains enough details to reconstruct the information in the original clock).

1.2.1 Metadata Hashtrees

In addition to storing Metadata Objects, the subsystem maintains Hash Trees that are the basis for determining differences between data stored on two nodes and can be used to determine when individual or groups of keys change.

The Hash Trees that are used are implemented in the hashtree_tree module. More details on its implementation can be found there.

Each node maintains its own hashtree_tree via the riak_core_metadata_hashtree, or Metadata Hashtree, process. This process is responsible for safely applying different operations on the tree, in addition to exposing information needed elsewhere in the subsystem and to clients. It also periodically updates the tree, when not in use by other parts of the subsystem, so that clients using it to detect changed keys can do so without forcing the update themselves.

Metadata Hashtrees are only maintained for the lifetime of a running node. They are rebuilt each time a node starts and are never rebuilt for as long as the node continues to run. The initial design of this part of the system included using memory backed Hash Trees. However, they are currently backed by LevelDB and cleared on graceful shutdown and each time the node is started. The entire tree is stored within one leveldb instance.

The Metadata Manager updates the hashes for keys using riak_core_metadata_hashtree in its write path. The upper levels of the tree will only reflect those changes when the tree is updated -- either periodically or when needed by another part of the subsystem.

1.3 Broadcast

The above details focus on Cluster Metadata from the point of view of a single node. It is the broadcast mechanism that puts the "Cluster" in "Cluster Metadata". This mechanism was built to be general and is in it of itself a separate subsystem. More details about how it works can be found in the documentation about riak_core_broadcast. The following describes how Cluster Metadata uses this subsystem.

The riak_core_broadcast_handler behaviour for Cluster Metadata is the Metadata Manager. In addition to the simple get/put interface, it implements all the required broadcast functions.

The majority of the riak_core_broadcast_handler functions are well-defined, however, exchange/1 allows a lot of leeway in how to implement the repair of data between the local node and a remote peer. To implement exchange/1, Cluster Metadata uses the Metadata Hashtrees on both nodes to determine differences between data stored on each and the existing riak_core_metadata_manager interface to repair those differences. The implementation of this logic can be found in riak_core_metadata_exchange_fsm.

1.4 API

The API, contained within the riak_core_metadata module, ties together the above parts and is the only interface that should be used by clients. It is not only responsible for hiding the logical clock and internal representation of the object but, also, performing writes via the Metadata Manager and submitting broadcasts of updates and resolved values.

2. Future Work & Known Issues

Known outstanding work and issues can be found under the riak_core repository's issue list. All issues pertaining to this subsystem have (and should be created with) the Cluster Metadata tag.

3. Tips for Debugging and Resolving Issues

The following is a list of debugging and testing tips as well as ways to resolve unforeseen issues. It is by no means a complete list but it is, hopefully, a growing one.

3.1 Viewing the Broadcast Tree

It may be useful to know what nodes are being communicated with when replicating updates across the cluster for a given host. riak_core_broadcast's debug_get_peers/2,3 and debug_get_tree/2 expose this information without needing to inspect the process state of riak_core_broadcast The documentation of those functions has more details.

3.2 Retrieving Objects not Values

The API hides the internal representation of objects for the client. However, when debugging and testing, it is sometimes useful to see what the Metadata Manager actually has stored, especially the logical clock. In this case riak_core_metadata_manager:get/1,2 can be used to retrieve objects locally or from the Metadata Manager on another node.

3.3 Forcing Conflicting Writes

Although not typically a good thing to do, for testing purposes its useful to know how to force writing of siblings. Using the Metadata Manager this can be done easily by writing values with no logical clock:

riak_core_metadata_manager:put({FullPrefix, Key}, undefined,
riak_core_metadata_manager:put({FullPrefix, Key}, undefined,

The two calls can be run on the same or different nodes. If run on different nodes the siblings won't appear until after an exchange has completed.

3.4 Managing Exchanges

Most of the following is general to the broadcast mechanism but is useful within the context of Cluster Metadata.

3.4.1 Querying Exchanges Started by a Node

The functions riak_core_broadcast:exchanges/0,1 can be used to determine which exchanges are running that were started on the current or given node, respectively. These functions return a list of 4-tuples. The 4-tuple contains the name of the module used as the riak_core_broadcast_handler for the exchange (always riak_core_metadata_manager for Cluster Metadata), the node that is the peer of the exchange, a unique reference representing the exchange, and the process id of the exchange fsm doing the work.

3.4.2 Canceling Exchanges

Running exchanges may be aborted using riak_core_broadcast:cancel_exchanges/1. cancel_exchanges/1's argument takes several forms.

To cancel all Cluster Metadata exchanges started by the node where this command is run:

riak_core_broadcast:cancel_exchanges({mod, riak_core_metadata_manager}).

To cancel all exchanges with a given peer that were started by the node where this command is run:

riak_core_broadcast:cancel_exchanges({peer, Peer}).

where Peer is the Erlang node name of the remote node.

To cancel a specific exchange started by the node where this command is run:


where PidOrRef is a process id or unique reference found in a 4-tuple returned from riak_core_broadcast:exchanges/0.

3.4.3 Manually Triggering Exchanges

If it is necessary to ensure that the data stored in Cluster Metadata is the same between two nodes, an exchange can be triggered manually. These exchanges are triggered by the broadcast mechanism so, typically, this should not be necessary. However, it can be done by running the following from an Erlang node running Cluster Metadata:


Where Peer is the Erlang node name for another node in the cluster. This will trigger an exchange between the node the command was run on and Peer. This function does not block until the completion of the exchange. It only starts it. If the exchange was started successfully, {ok, Pid} will be returned, where Pid is the process id of the exchange fsm doing the work in the background. If the exchange could not be started {error, Reason} will be returned.

3.5 Repairing DETS Files

DETS files may need to be repaired if a node exits abnormally and the Metadata Manager was not allowed to properly close its files. When the Metadata Manager restarts it will repair the files when they are opened (this is the default DETS behavior).

More details on DETS repair can be found in the docs

3.6 Removing Metadata Hashtree Files

Metadata Hashtrees are meant to be ephermeral so they may be deleted without affecting a cluster. The node must first be shutdown and then restarted. This ensures that the Metadata Hashtree process does not crash when the files are deleted and that the tree is rebuilt when the node starts back up.


The following describe the broadcast subsystems implemented by riak_core_broadcast. It is intended for those wishing to use the subsystem as well as support and modify it.

1. Overview

riak_core_broadcast provides a generalized mechanism for ensured eventual delivery of messages to all nodes in a riak_core cluster. The development of this subsystem was driven primarily by the implementation of Cluster Metadata, however it was built for use in other applications as well. Because of this generalization, it is required the client application/subsystem provide certain guarantees in order to ensure 100% reliability when delivering messages.

Broadcasted messages are opaque to riak_core_broadcast. The only requirement is that each message must be uniquely identifiable. The storage of messages is the responsibility of the application/subsystem using riak_core_broadcast. For more details about what is required when using riak_core_broadcast see Broadcast Handlers.

To deliver messages riak_core_broadcast sends them along the paths of a directed, possibly cyclic, graph of all nodes in the cluster. In most cases, however, the graph will be a spanning tree. Only in the cases where membership changes or failures occur will a less optimal path be used. Keep reading for more details.

riak_core_broadcast provides the following guarantee regarding message delivery -- assuming the application/subsystem using it meets the necessary requirements (see Broadcast Handlers):

A broadcasted message will be delivered to all nodes *at least
once*. There is one exception: when a node the a broadcast originates on is
removed before it can deliver the payload to at least one other
node, this guaruntee is not met.

Exploration of ways to mitigate the exception mentioned are being tracked in this issuee.

2. Implementation

riak_core_broadcast is a hybrid of deterministic broadcast and stochastic epidemic protocols: a self-healing spanning tree. The primary path for the delivery of messages is to broadcast using the spanning tree but when links fail, they are healed by using pull-based gossip [1]. Sometimes, the spanning tree may have additional links -- making it a directed , and possibly cyclic, graph while it is being healed or when membership changes. In addition, the entire mechanism is backed by a standard anti-entropy epidemic [1]

2.1 Plumtree

The implementation is based upon "Plumtree" [2] , which specifies an algorithm for a self-healing spanning tree. The paper [2] has a clear and concise description of the algorithm so none will be recreated here.

Plumtree was chosen, primarily, because of its efficiency in the stable case while maintaining reliability. When the graph of nodes is a proper spanning tree, each node will receive the payload exactly once. In failure cases, nodes may receive the payload more than once, but the act of doing so heals the tree and removes redundant links.

2.2 Differences & Extensions

riak_core_broadcast extends what is specified in the Plumtree Protocol. The requirements of the implementation in [2] and riak_core_broadcast differ in several ways. The following describes those differences and their effect on the implementation.

2.2.1 Peer Service

In large-scale, partially connected eventual delivery protocols it is common to separate responsibilities into two parts: the peer service and the message overlay. The latter is what Plumtree implements and it is responsible for providing a reliable and hopefully efficient delivery mechanism for most messages. The former is responsible for ensuring delivery of critical data like membership, among other things not relevant to this document and may do so less efficiently at the cost of safety. The message overlay relies on the peer service and makes certain assumptions about it. These assumptions are the source of many of the differences between the implementation specified in [2] and riak_core_broadcast. The Peer Service in [2] is HyParView [3]. In riak_core it is the existing riak_core_gossip.

The assumptions made in [2] are discussed in section 3.3 of the paper. The "Connectivity" requirement of the service is not one that can be provided by Riak. Namely, a node must be able to be a singleton during a long lived partition and still ensure the delivery to all nodes. The implementation in [2] assumes in this case that the peer service will fail this node out of the overlay and that failure to deliver messages to it or sent from it is acceptable. We do not.

The paper also is focused on clusters on the order of tens of thousands of nodes, whereas Riak cluster, typically range from 5 to 100. The Peer Service for riak_core_broadcast is fully-connected -- each node is aware of every other node in the cluster and can communicate with them directly outside of riak_core_broadcast. This affects how membership is handled.

2.2.2 Historical Delivery

The reliability of Plumtree is measured in [2] by determining how many healthy nodes receive the message, where healthy nodes are defined to be nodes that have not failed while there are protocol messages related to the broadcast "in the network". The reliability of riak_core_broadcast, however, must define healthy nodes as those that have not permanently failed or have been removed from the cluster.

When considering dynamic membership, the definition of healthy nodes in [2] is extended to include joined nodes that did so while there were still protocol messages related to the broadcast "in the network". riak_core_broadcast, extends this further to include any node which may join in the future, despite protocol messages no longer being "in the network".

This requirement can be thought of as "Historical Delivery".

2.2.3 Lazy Message Queuing

Plumtree is a reliable protocol in the face of massive node failures, which was the focus of the research, however, it is not tolerant to network partitions. This is hinted at in the "Connectivity" requirement [2] of Plumtree's Peer Service and can be proven using a formal model. To address this, riak_core_broadcast extends how the set of unsent lazy (or "i have") messages is maintained and adds an additional message: "ignore i have".

The original algorithm removes the record of an "i have" message, used to indicate it needs to sent, as soon as one attempt is made to send it. This is a problem when messages are dropped by the network. riak_core_broadcast maintains this record after the attempt and periodically scans the set of them and re-sends.

The record is removed from the set when an "ignore i have" message is received. This message is sent by the peer who received the "i have" and indicates that no graft is necessary.

2.2.4 Anti-Entropy

In cases where a large percentage of nodes fail, the union of all sets of lazy links between nodes, which heal the spanning tree, will no longer contain links between all nodes. When this happens, messages may not be delivered to all healthy nodes even under the less strict definition used in [2]. Although, [2] discusses a similar failure scenario, they are not quite the same because of the Peer Service, however the differences are not worth exploring further.

Although high failure percentages are not so common in riak_core applications, it is still an unacceptable failure scenario. riak_core_broadcast uses a classic anti-entropy epidemic [1] to ensure that each node eventually checks-in with every other node in the cluster and delivers any missing messages. These are referred to as "Exchanges".

This, also, ensures that riak_core_broadcast maintains Historical Delivery.

2.2.5 Initial Peers

Since the Peer Service between the two implementations differ their generation of the set of initial eager and lazy peers for each node do as well. riak_core_broadcast uses the riak_core_util:build_tree/3. See riak_core_broadcast:init_peers/1 for more details.

2.2.6 Dynamic Membership

Since riak_core_gossip is fully-connected, unlike HyParView [3], how the system deals with membership changes has also been modified. riak_core_broadcast simply regenerates the initial peers, as if it were restarted w/ the new membership information, and allows the protocol to modify things as needed.

3. Broadcast Handlers

This section needs content

4. References


This comment has been minimized.

Copy link

@henglinli henglinli commented Jun 25, 2014

Thanks for your sharing!
One picture lost!


This comment has been minimized.

Copy link
Owner Author

@jrwest jrwest commented Sep 3, 2014

image restored.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment