Skip to content

Instantly share code, notes, and snippets.

@dch
Forked from colefichter/mmc.erl
Last active August 29, 2015 14:25
Show Gist options
  • Save dch/47df5d251bad7dd88437 to your computer and use it in GitHub Desktop.
Save dch/47df5d251bad7dd88437 to your computer and use it in GitHub Desktop.
A multi-master (AKA distributed) counter implemented as a Conflict-free Replicated Data Type.
-module(mmc).
% A multi-master counter implemented as a CRDT. Each server holds an array of counters (implemented as a dict here...)
% and Strong Eventual Consistency is acheived by using commutive increments on each of the counters. The current
% value read at any server is the sum of all the counter values (again, one per server) stored on that machine.
% As updates are synced between servers, all servers will eventually converge on a counter sum value with no
% conflicts possible. Also note that this does not use any consensus algorithm, so it remains fast as the number of
% servers scales. See http://research.microsoft.com/apps/video/default.aspx?id=153540&r=1
-behaviour(gen_server).
% Client API
-export([start_link/0, read/1, increment/1, send_state/2, stop/1]).
% Server Implementation
-export([init/1, handle_cast/2, handle_call/3, handle_info/2, terminate/2, code_change/3]).
%---------------------------------------------------------------------------------------------
% Client API
%---------------------------------------------------------------------------------------------
start_link() -> gen_server:start_link(?MODULE, [], []).
% Get the value of the counter.
read(ServerPid) -> gen_server:call(ServerPid, read).
% Tell the cluster (via ANY server) to increment the distributed counter.
increment(ServerPid) -> gen_server:call(ServerPid, increment).
% Command this server to send it's current state to another server.
send_state(SourceServerPid, DestinationServerPid) -> gen_server:call(SourceServerPid, {send_state_to, DestinationServerPid}).
stop(ServerPid) -> gen_server:cast(ServerPid, stop).
%---------------------------------------------------------------------------------------------
% Server Implementation
%---------------------------------------------------------------------------------------------
init([]) ->
Dict = dict:store(self(), 0, dict:new()),
{ok, Dict}.
handle_cast(stop, Dict) -> {stop, normal, Dict};
handle_cast({update_remote_state, RemoteServerId, RemoteValue}, Dict) ->
Dict1 = dict:store(RemoteServerId, RemoteValue, Dict),
{noreply, Dict1};
handle_cast(_any, State) -> {noreply, State}.
handle_call(read, _From, Dict) ->
Val = dict:fold(fun(_K, V, Acc) -> V + Acc end, 0, Dict),
{reply, Val, Dict};
handle_call(increment, _From, Dict) ->
Dict1 = dict:update_counter(self(), 1, Dict),
{reply, ok, Dict1};
handle_call({send_state_to, DestinationServerPid}, _From, Dict) ->
SourceServerPid = self(),
SourceValue = dict:fetch(self(), Dict),
% Use cast to avoid a deadlock!
gen_server:cast(DestinationServerPid, {update_remote_state, SourceServerPid, SourceValue}),
{reply, ok, Dict}.
handle_info(Message, Dict) ->
io:format("Unexpected message ~p~n", [Message]),
{noreply, Dict}.
terminate(normal, _State) -> ok;
terminate(shutdown, _State) -> ok.
code_change(_OldVsn, State, _Extra) -> {ok, State}.
%---------------------------------------------------------------------------------------------
% Unit Tests
%---------------------------------------------------------------------------------------------
-include_lib("eunit/include/eunit.hrl").
all_test() ->
{ok, Server1} = start_link(),
{ok, Server2} = start_link(),
?assertEqual(0, read(Server1)),
?assertEqual(0, read(Server2)),
increment(Server1), % Begin concurrent updates...
?assertEqual(1, read(Server1)),
?assertEqual(0, read(Server2)),
increment(Server2), % Concurrent updates complete.
?assertEqual(1, read(Server1)),
?assertEqual(1, read(Server2)), % Replicas have not converged...
% This is kind of like a split-brain... pretend it healed and begin sending updates throughout the cluster:
send_state(Server1, Server2), % Replicate the update on S1 to S2
?assertEqual(1, read(Server1)),
?assertEqual(2, read(Server2)), % Server two has converged...
send_state(Server2, Server1), % Replicate the update on S2 to S1
?assertEqual(2, read(Server1)), % Server one has converged...
?assertEqual(2, read(Server2)), % The DISTRIBUTED value of the replica set has converged completely! SEC worked without conflict :)
stop(Server1),
stop(Server2).
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment