You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
The following is intended for those wishing to use the Cluster
Metadata subsystem inside of Riak or another riak_core
application. It is not intended for those wishing to support, debug
or contribute to the subsystem, although it may be helpful.
Cluster Metadata is intended to be used by riak_core applications
wishing to work with information stored cluster-wide. It is useful for
storing application metadata or any information that needs to be
read without blocking on communication over the network.
1.1 Data Model
Cluster Metadata is a key-value store. It treats values as opaque
Erlang terms that are fully addressed by their "Full Prefix" and
"Key". A Full Prefix is a {atom() | binary(), atom() | binary()},
while a Key is any Erlang term. The first element of the Full Prefix
is referred to as the "Prefix" and the second as the "Sub-Prefix".
1.2 Storage
Values are stored on-disk and a full copy is also maintained
in-memory. This allows reads to be performed only from memory, while
writes are affected in both mediums.
1.3 Consistency
Updates in Cluster Metadata are eventually consistent. Writing a value
only requires acknowledgment from a single node and as previously
mentioned, reads return values from the local node, only.
1.4 Replication
Updates are replicated to every node in the cluster, including nodes
that join the cluster after the update has already reached all nodes
in the previous set of members.
2. API
The interface to Cluster Metadata is provided by the
riak_core_metadata
module. The module's documentation is the official source for
information about the API, but some details are re-iterated here.
2.1 Reading and Writing
Reading the local value for a key can be done with the get/2,3
functions. Like most riak_core_metadata functions, the higher arity
version takes a set of possible options, while the lower arity
function uses the defaults.
Updating a key is done using put/3.4. Performing a put only blocks
until the write is affected locally. The broadcast of the update is
done asynchronously.
2.1.1 Deleting Keys
Deletion of keys is logical and tombstones are not
reaped. delete/2,3 act the same as put/3,4 with respect to
blocking and broadcast.
2.2 Iterators
Along with reading individual keys, the API also allows Full Prefixes
to be iterated over. Iterators can visit both keys and values. They
are not ordered, nor are they read-isolated. However, they do
guarantee that each key is seen at most once for the lifetime of an
iterator.
See iterator/2 and the itr_* functions.
2.3 Conflict Resolution
Conflict resolution can be done on read or write.
On read, if the conflict is resolved, an option, allow_put, passed
to get/3 or iterator/2 controls whether or not the resolved value
will be written back to local storage and broadcast asynchronously.
On write, conflicts are resolved by passing a function instead of a
new value to put/3,4. The function is passed the list of existing
values and can use this and values captured within the closure to
produce a new value to store.
2.2 Detecting Changes in Groups of Keys
The prefix_hash/1 function can be polled to determined when groups
of keys, by Prefix or Full Prefix, have changed.
3. Common Pitfalls & Other Notes
The following is by no means a complete list of things to keep in mind
when developing on top of Cluster Metadata.
3.1 Storage Limitations
Cluster Metadata use dets for on-disk storage. There is a dets
table per Full Prefix, which limits the amount of data stored under
each Full Prefix to 2GB. This size includes the overhead of
information stored alongside values, such as the logical clock and
key.
Since a full-copy of the data is kept in-memory, its usage must also
be considered.
3.2 Replication Limitations
Cluster Metadata uses disterl for message delivery, like most Erlang
applications. Standard caveats and issues with large and/or too
frequent messages still apply.
3.3 Last-Write Wins
The default conflict resolution strategy on read is
last-write-wins. The usual caveats about the dangers of this method
apply.
3.4 "Pathological Eventual Consistency"
The extremely frequent writing back of resolved values after read in
an eventually consistent store where acknowledgment is only required
from one node for both types of operations can lead to an interesting
pathological case where siblings continue to be produce (although the
set does not grow unbounded). A very rough exploration of this can be
found here.
If a riak_core application is likely to have concurrent writes and
wishes to read extremely frequently, e.g. in the Riak request path, it
may be advisable to use {allow_put, false} with get/3.
The following describes Cluster Metadata from an implementor's
perspective. It is intended for those wishing to familiarize
themselves with the implementation in order to make changes or to
support the subsystem. This document is not intended for those wishing
to use Cluster Metadata but may be helpful.
The Cluster Metadata subsystem is divided into several parts spread
across multiple modules. These parts can be divided into three
categories: representation, storage and broadcast. Unifying them is
the API, which presents the subsystem as one cohesive unit to clients.
1.1 Representation
Cluster Metadata stores and represents values internally in a
different form than how it is presented to clients. Most notably,
clients are not exposed to the logical clocks stored alongside values
while internally they are used heavily.
The
riak_core_metadata_object
module is responsible for defining the internal representation --
referred to as the "Metadata Object". In addition, it provides a set
of operations that can be performed on Metadata Object(s). These
operations include conflict resolution, reconciling differing versions
of an object, and accessing the values, the clock and additional
information about the object. Outside this module, the
structure of the Metadata Object is opaque and all operations on
objects should use it.
1.1.1 Logical Clocks
The logical clocks used by Cluster Metadata are
Dotted Version Vectors. The
implementation is contained within dvvset.erl.
1.2 Storage
Data is stored both in-memory, using ets, and on-disk, using
dets. Each node locally manages its own storage via the
riak_core_metadata_manager, or "Metadata Manager", process running on it.
The Metadata Manager is not responsible for replicating data, nor is
it aware of other nodes. However, it is aware that writes are causally
related by their logical clocks, and it uses this information to make
local storage decisions, e.g. when an object is causally older than
the value already stored. Data read out of the Metadata Manager is a
Metadata Object, while data written is either a Metadata Object or a
raw value and a view the logical clock (the view contains enough
details to reconstruct the information in the original clock).
1.2.1 Metadata Hashtrees
In addition to storing Metadata Objects, the subsystem maintains Hash
Trees that are the basis for determining differences between data
stored on two nodes and can be used to determine when individual or
groups of keys change.
The Hash Trees that are used are implemented in the
hashtree_tree
module. More details on its implementation can be found there.
Each node maintains its own hashtree_tree via the
riak_core_metadata_hashtree, or Metadata Hashtree, process. This
process is responsible for safely applying different operations on the
tree, in addition to exposing information needed elsewhere in the
subsystem and to clients. It also periodically updates the tree, when
not in use by other parts of the subsystem, so that clients using it
to detect changed keys can do so without forcing the update themselves.
Metadata Hashtrees are only maintained for the lifetime of a running
node. They are rebuilt each time a node starts and are never rebuilt
for as long as the node continues to run. The initial design of this
part of the system included using memory backed Hash Trees. However,
they are currently backed by LevelDB and cleared on graceful shutdown
and each time the node is started. The entire tree is stored within
one leveldb instance.
The Metadata Manager updates the hashes for keys using
riak_core_metadata_hashtree in its write path. The upper levels of
the tree will only reflect those changes when the tree is updated --
either periodically or when needed by another part of the subsystem.
1.3 Broadcast
The above details focus on Cluster Metadata from the point of view of
a single node. It is the broadcast mechanism that puts the "Cluster"
in "Cluster Metadata". This mechanism was built to be general and is
in it of itself a separate subsystem. More details about how it works
can be found in the documentation about
riak_core_broadcast. The
following describes how Cluster Metadata uses this subsystem.
The riak_core_broadcast_handler behaviour for Cluster Metadata is
the Metadata Manager. In addition to the simple get/put interface, it
implements all the required broadcast functions.
The majority of the riak_core_broadcast_handler functions are
well-defined, however, exchange/1 allows a lot of leeway in how to
implement the repair of data between the local node and a remote
peer. To implement exchange/1, Cluster Metadata uses the Metadata
Hashtrees on both nodes to determine differences between data stored
on each and the existing riak_core_metadata_manager interface to
repair those differences. The implementation of this logic can be
found in riak_core_metadata_exchange_fsm.
1.4 API
The API, contained within the
riak_core_metadata
module, ties together the above parts and is the only interface that
should be used by clients. It is not only responsible for hiding the
logical clock and internal representation of the object but, also,
performing writes via the Metadata Manager and submitting
broadcasts of updates and resolved values.
2. Future Work & Known Issues
Known outstanding work and issues can be found under the riak_core
repository's issue list. All issues pertaining to this subsystem have
(and should be created with) the
Cluster Metadata
tag.
3. Tips for Debugging and Resolving Issues
The following is a list of debugging and testing tips as well as ways
to resolve unforeseen issues. It is by no means a complete list but it
is, hopefully, a growing one.
3.1 Viewing the Broadcast Tree
It may be useful to know what nodes are being communicated with when
replicating updates across the cluster for a given host.
riak_core_broadcast's debug_get_peers/2,3 and debug_get_tree/2
expose this information without needing to inspect the process state
of riak_core_broadcast The documentation of those functions has more details.
3.2 Retrieving Objects not Values
The API hides the internal representation of objects for the
client. However, when debugging and testing, it is sometimes useful to
see what the Metadata Manager actually has stored, especially the
logical clock. In this case riak_core_metadata_manager:get/1,2 can
be used to retrieve objects locally or from the Metadata Manager on
another node.
3.3 Forcing Conflicting Writes
Although not typically a good thing to do, for testing purposes its
useful to know how to force writing of siblings. Using the Metadata
Manager this can be done easily by writing values with no logical
clock:
The two calls can be run on the same or different nodes. If run on
different nodes the siblings won't appear until after an exchange has completed.
3.4 Managing Exchanges
Most of the following is general to the broadcast mechanism but is
useful within the context of Cluster Metadata.
3.4.1 Querying Exchanges Started by a Node
The functions riak_core_broadcast:exchanges/0,1 can be used to
determine which exchanges are running that were started on the current
or given node, respectively. These functions return a list of
4-tuples. The 4-tuple contains the name of the module used as the
riak_core_broadcast_handler for the exchange (always
riak_core_metadata_manager for Cluster Metadata), the node that is the
peer of the exchange, a unique reference representing the exchange,
and the process id of the exchange fsm doing the work.
3.4.2 Canceling Exchanges
Running exchanges may be aborted using
riak_core_broadcast:cancel_exchanges/1. cancel_exchanges/1's
argument takes several forms.
To cancel all Cluster Metadata exchanges started by the node where
this command is run:
where Peer is the Erlang node name of the remote node.
To cancel a specific exchange started by the node where this command
is run:
riak_core_broadcast:cancel_exchanges(PidOrRef).
where PidOrRef is a process id or unique reference found in a
4-tuple returned from riak_core_broadcast:exchanges/0.
3.4.3 Manually Triggering Exchanges
If it is necessary to ensure that the data stored in Cluster Metadata
is the same between two nodes, an exchange can be triggered
manually. These exchanges are triggered by the broadcast mechanism so,
typically, this should not be necessary. However, it can be done by
running the following from an Erlang node running Cluster Metadata:
riak_core_metadata_manager:exchange(Peer)
Where Peer is the Erlang node name for another node in the
cluster. This will trigger an exchange between the node the command
was run on and Peer. This function does not block until the
completion of the exchange. It only starts it. If the exchange was
started successfully, {ok, Pid} will be returned, where Pid is the
process id of the exchange fsm doing the work in the background. If
the exchange could not be started {error, Reason} will be returned.
3.5 Repairing DETS Files
DETS files may need to be repaired if a node exits abnormally and the
Metadata Manager was not allowed to properly close its files. When the
Metadata Manager restarts it will repair the files when they are
opened (this is the default DETS behavior).
More details on DETS repair can be found in the docs
3.6 Removing Metadata Hashtree Files
Metadata Hashtrees are meant to be ephermeral so they may be deleted
without affecting a cluster. The node must first be shutdown and then
restarted. This ensures that the Metadata Hashtree process does not
crash when the files are deleted and that the tree is rebuilt when the
node starts back up.
The following describe the broadcast subsystems implemented by
riak_core_broadcast. It is intended for those wishing to use the
subsystem as well as support and modify it.
riak_core_broadcast provides a generalized mechanism for ensured
eventual delivery of messages to all nodes in a riak_core
cluster. The development of this subsystem was driven primarily by the
implementation of Cluster Metadata, however it was built for use in
other applications as well. Because of this generalization, it is
required the client application/subsystem provide certain guarantees
in order to ensure 100% reliability when delivering messages.
Broadcasted messages are opaque to riak_core_broadcast. The only
requirement is that each message must be uniquely identifiable. The
storage of messages is the responsibility of the application/subsystem
using riak_core_broadcast. For more details about what is required
when using riak_core_broadcast see
Broadcast Handlers.
To deliver messages riak_core_broadcast sends them along the paths
of a directed, possibly cyclic, graph of all nodes in the cluster. In
most cases, however, the graph will be a spanning tree. Only in the
cases where membership changes or failures occur will a less optimal
path be used. Keep reading for more details.
riak_core_broadcast provides the following guarantee regarding
message delivery -- assuming the application/subsystem using it meets
the necessary requirements (see
Broadcast Handlers):
A broadcasted message will be delivered to all nodes *at least
once*. There is one exception: when a node the a broadcast originates on is
removed before it can deliver the payload to at least one other
node, this guaruntee is not met.
Exploration of ways to mitigate the exception mentioned are being
tracked in
this issuee.
2. Implementation
riak_core_broadcast is a hybrid of deterministic broadcast and
stochastic epidemic protocols: a self-healing spanning tree. The
primary path for the delivery of messages is to
broadcast using the spanning tree but when links fail, they are healed
by using pull-based gossip [1]. Sometimes, the spanning tree may have
additional links -- making it a directed , and possibly cyclic, graph
while it is being healed or when membership changes. In addition, the
entire mechanism is backed by a standard anti-entropy epidemic [1]
2.1 Plumtree
The implementation is based upon "Plumtree" [2] , which specifies an
algorithm for a self-healing spanning tree. The paper [2] has a clear
and concise description of the algorithm so none will be recreated
here.
Plumtree was chosen, primarily, because of its efficiency in the
stable case while maintaining reliability. When the graph of nodes is
a proper spanning tree, each node will receive the payload exactly
once. In failure cases, nodes may receive the payload more than once,
but the act of doing so heals the tree and removes redundant links.
2.2 Differences & Extensions
riak_core_broadcast extends what is specified in the Plumtree
Protocol. The requirements of the implementation in [2] and
riak_core_broadcast differ in several ways. The following describes
those differences and their effect on the implementation.
2.2.1 Peer Service
In large-scale, partially connected eventual delivery protocols it is
common to separate responsibilities into two parts: the peer service
and the message overlay. The latter is what Plumtree implements and it
is responsible for providing a reliable and hopefully efficient
delivery mechanism for most messages. The former is responsible for
ensuring delivery of critical data like membership, among other things
not relevant to this document and may do so less efficiently at the
cost of safety. The message overlay relies on the peer service and
makes certain assumptions about it. These assumptions are the source
of many of the differences between the implementation specified in [2]
and riak_core_broadcast. The Peer Service in [2] is HyParView
[3]. In riak_core it is the existing riak_core_gossip.
The assumptions made in [2] are discussed in section 3.3 of the
paper. The "Connectivity" requirement of the service is not one that
can be provided by Riak. Namely, a node must be able to be a singleton
during a long lived partition and still ensure the delivery to all
nodes. The implementation in [2] assumes in this case that the peer
service will fail this node out of the overlay and that failure to
deliver messages to it or sent from it is acceptable. We do not.
The paper also is focused on clusters on the order of tens of
thousands of nodes, whereas Riak cluster, typically range from 5 to
100. The Peer Service for riak_core_broadcast is fully-connected --
each node is aware of every other node in the cluster and can
communicate with them directly outside of riak_core_broadcast. This
affects how membership is handled.
2.2.2 Historical Delivery
The reliability of Plumtree is measured in [2] by determining how many
healthy nodes receive the message, where healthy nodes are defined to
be nodes that have not failed while there are protocol messages
related to the broadcast "in the network". The reliability of
riak_core_broadcast, however, must define healthy nodes as those
that have not permanently failed or have been removed from the
cluster.
When considering dynamic membership, the definition of healthy nodes
in [2] is extended to include joined nodes that did so while there
were still protocol messages related to the broadcast "in the
network". riak_core_broadcast, extends this further to include any
node which may join in the future, despite protocol messages no longer
being "in the network".
This requirement can be thought of as "Historical Delivery".
2.2.3 Lazy Message Queuing
Plumtree is a reliable protocol in the face of massive node failures,
which was the focus of the research, however, it is not tolerant to
network partitions. This is hinted at in the "Connectivity"
requirement [2] of Plumtree's Peer Service and can be proven using a
formal model. To
address this, riak_core_broadcast extends how the set of unsent lazy
(or "i have") messages is maintained and adds an additional message:
"ignore i have".
The original algorithm removes the record of an "i have" message, used to
indicate it needs to sent, as soon as one attempt is made to send
it. This is a problem when messages are dropped by the
network. riak_core_broadcast maintains this record after the attempt
and periodically scans the set of them and re-sends.
The record is removed from the set when an "ignore i have" message is
received. This message is sent by the peer who received the "i have"
and indicates that no graft is necessary.
2.2.4 Anti-Entropy
In cases where a large percentage of nodes fail, the union of all sets
of lazy links between nodes, which heal the spanning tree, will no
longer contain links between all nodes. When this happens, messages
may not be delivered to all healthy nodes even under the less strict
definition used in [2]. Although, [2] discusses a similar failure
scenario, they are not quite the same because of the Peer Service,
however the differences are not worth exploring further.
Although high failure percentages are not so common in riak_core
applications, it is still an unacceptable failure
scenario. riak_core_broadcast uses a classic anti-entropy epidemic
[1] to ensure that each node eventually checks-in with every other
node in the cluster and delivers any missing messages. These are
referred to as "Exchanges".
Since the Peer Service between the two implementations differ their
generation of the set of initial eager and lazy peers for each node do
as well. riak_core_broadcast uses the
riak_core_util:build_tree/3. See riak_core_broadcast:init_peers/1
for more details.
2.2.6 Dynamic Membership
Since riak_core_gossip is fully-connected, unlike HyParView [3], how
the system deals with membership changes has also been
modified. riak_core_broadcast simply regenerates the initial peers,
as if it were restarted w/ the new membership information, and allows
the protocol to modify things as needed.
Thanks for your sharing!
One picture lost!