Skip to content

Instantly share code, notes, and snippets.

@asonge
Created February 22, 2014 21:21
Show Gist options
  • Save asonge/9162596 to your computer and use it in GitHub Desktop.
Save asonge/9162596 to your computer and use it in GitHub Desktop.

Ravel

Ravel provides a set of CRDT's (Cumulative Replicated Data Types) for Elixir, with a little more than inspiration from Basho's https://github.com/basho/riak_dt

About the name

CRDT's seem to be a great way to overcomplicated simple datastructures, and some people ...

defprotocol Ravel.CRDT do
@type actor :: any
@type t :: any
@doc "List the actors"
@spec actors(t) :: [actor]
def actors(crdt)
@doc "See if CRDT's are equal, not value-equal"
@spec equal?(t, t) :: boolean
def equal?(crdt1, crdt2)
@doc "See if CRDT's values are equal"
@spec equal_value?(t, t) :: boolean
def equal_value?(crdt1, crdt2)
@doc "Merge the crdts"
@spec merge(t, t) :: t
def merge(crdt1, crdt2)
@doc "Get the primitive value of the crdt"
@spec value(t) :: term
def value(crdt)
end
defmodule Ravel.GCounter do
@moduledoc "A grow-only counter"
@type actor :: term
@type value :: pos_integer
defrecordp :s, __MODULE__, dict: nil
@opaque gcounter :: { __MODULE__, dict: Dict.t | nil }
@doc "Create a new gcounter"
@spec new() :: gcounter
def new() do
s(dict: HashDict.new())
end
@doc "Get a list of actors"
@spec actors(gcounter) :: [actor()]
def actors(s(dict: t)) do
Enum.sort HashDict.keys(t)
end
@doc "Increment the counter for actor by 1"
@spec increment(counter :: gcounter, actor) :: gcounter
def increment(counter, actor) do
increment(counter, actor, 1)
end
@doc "Increment the counter for actor by n"
@spec increment(counter :: gcounter, actor, pos_integer) :: gcounter
def increment(s(dict: t), actor, n) when n > 0 do
s(dict: HashDict.update(t, actor, n, fn(v) -> v+n end))
end
@doc "Merge 2 gcounters"
@spec merge(gcounter, gcounter) :: gcounter
def merge(s(dict: v1), s(dict: v2)) do
s(dict: HashDict.merge(v1, v2, fn(_, v1, v2) -> max(v1, v2) end))
end
@doc "The value of the grow-only counter"
@spec value(gcounter) :: pos_integer()
def value(s(dict: t)) do
Enum.reduce(t, 0, fn({_, v}, acc) -> acc + v end)
end
end
defimpl Ravel.CRDT, for: Ravel.GCounter do
alias Ravel.GCounter
def actors(crdt), do: GCounter.actors(crdt)
def equal?(crdt1, crdt2), do: crdt1 === crdt2
def equal_value?(crdt1, crdt2), do: GCounter.value(crdt1) === GCounter.value(crdt2)
def merge(crdt1, crdt2), do: GCounter.merge(crdt1, crdt2)
def value(crdt), do: GCounter.value(crdt)
end
defimpl Inspect, for: Ravel.GCounter do
import Inspect.Algebra
def inspect({Ravel.GCounter, dict}=counter, opts) do
case opts.pretty do
false ->
concat ["#GCounter<", Inspect.List.inspect(HashDict.to_list(dict) |> Enum.sort, opts), ">"]
true ->
value = Ravel.GCounter.value counter
"#GCounter<#{value}>"
end
end
end
defmodule Ravel.LWWRegister do
end
defmodule Ravel.Mixfile do
use Mix.Project
def project do
[ app: :ravel,
version: "0.0.1",
elixir: "~> 0.12.4",
deps: deps ]
end
# Configuration for the OTP application
def application do
[mod: { Ravel, [] }]
end
# Returns the list of dependencies in the format:
# { :foobar, git: "https://github.com/elixir-lang/foobar.git", tag: "0.1" }
#
# To specify particular versions, regardless of the tag, do:
# { :barbat, "~> 0.1", github: "elixir-lang/barbat" }
defp deps do
[
{ :ex_doc, github: "elixir-lang/ex_doc" },
{ :excheck, github: "parroty/excheck" }
]
end
end
defmodule Ravel.ORDict do
@moduledoc """
An observed-remove Dictionary.
Doesn't use tombstones, but has 1 vclock for each entry and a vclock for the
dict. When dicts are merged, the dict vclocks are used to make sure that deletes
don't reappear.
Based on `riak_dt_map`.
"""
alias Ravel.VClock
@behaviour Dict
defrecordp :s, __MODULE__, [clock: nil, values: nil]
@type vclock :: VClock.vclock
@type actor :: term
@type key :: term
@type crdt :: tuple # First element of tuple is the Module
@opaque ordict :: {__MODULE__, clock :: vclock, values :: Dict.t}
# @opaque rawkey :: term
@opaque rawvalue :: {vclock, crdt}
@doc "Creates a new ORDict"
@spec new() :: ordict
def new() do
s(clock: VClock.new(), values: HashDict.new())
end
@doc """
Create a new ORDict
"""
@spec new(Enum.t) :: ordict
def new(enum) do
Enum.reduce enum, new, fn
({actor, {key, value}}, s()=acc) ->
add!(acc, actor, key, value);
(_, _) ->
{:error, :need_actor_tuple}
end
end
@doc "Creates a new ORDict given an Enum"
@spec new(Enum.t, Keyword.t) :: ordict
def new(enum, opts) when is_list(opts) do
actor = Keyword.fetch!(opts, :actor)
Enum.reduce enum, new, fn({k,v}, acc) -> add!(acc, actor, k, v) end
end
@doc "Creates a new ORDict given an Enum and a transformation function"
@spec new(Enum.t, (term -> {actor, {key, crdt}})) :: ordict
def new(enum, fun) when is_function(fun) do
Enum.map(enum, fun) |> new
end
@doc "Get which actors are being used"
@spec actors(ordict) :: [actor]
def actors(s(clock: clock)=_ordict) do
VClock.actors(clock)
end
@doc "Add an element to the dict"
@spec add(ordict, actor, key, crdt) :: {:ok, ordict}
def add(s(clock: setclock, values: values)=ordict, actor, key, crdt) when is_atom(elem(crdt, 0)) do
new_clock = VClock.increment(setclock, actor)
new_value_clock = VClock.new([{actor, VClock.get_counter(new_clock, actor)}])
new_value = HashDict.update(values, key, {new_value_clock, crdt}, fn({old_value_clock,_old_crdt}) ->
{VClock.merge(new_value_clock, old_value_clock), crdt}
end)
{:ok, s(ordict, clock: new_clock, values: new_value)}
end
@doc "Add an element to the dict"
@spec add!(ordict, actor, key, crdt) :: ordict
def add!(ordict, actor, key, value) do
{:ok, new_ordict} = add(ordict, actor, key, value)
new_ordict
end
@doc "Removes element from dict, but violates semantics and ignores preconditions!"
@spec delete(ordict, key) :: ordict
def delete(ordict, key) do
case remove(ordict, key) do
{:ok, newdict} -> newdict
_ -> ordict
end
end
@doc "Removes element from dict, but violates semantics and ignores preconditions!"
@spec drop(ordict, [key]) :: ordict
def drop(s(values: values)=ordict, keys) do
s ordict, values: Dict.drop(values, keys)
end
@doc "Return an empty list of the same type as ordict"
@spec empty(ordict) :: boolean
def empty(s()=_ordict), do: new
@doc "Check if the values of the ordict are equal"
@spec equal?(ordict, ordict) :: boolean
def equal?(s(values: v1), s(values: v2)) do
Enum.all? Dict.keys(v1), &(elem(v1[&1],1)==elem(v2[&1],1))
end
@doc "Returns `{ :ok, value }` associated with key in dict. If dict does not contain key, returns `:error`."
@spec fetch(ordict, key) :: crdt
def fetch(s(values: v)=_ordict, key) do
case Dict.fetch v, key do
{:ok, {_, crdt}} -> {:ok, crdt}
other -> other
end
end
@doc "Returns the value associated with key in dict. If dict does not contain key, it raises KeyError."
@spec fetch!(ordict, key) :: crdt
def fetch!(s(values: v)=_ordict, key) do
{_, crdt} = Dict.fetch! v, key
crdt
end
@doc "Returns the value associated with key in dict. If dict does not contain key, returns default (or nil if not provided)."
@spec get(ordict, key, crdt) :: crdt | nil
def get(s(values: v)=_ordict, key, default \\ nil) do
case Dict.get v, key do
nil -> default
{_, crdt} -> crdt
end
end
@doc "Returns whether the given key exists in the given dict."
@spec has_key?(ordict, key) :: boolean
def has_key?(s(values: v)=_ordict, key) do
Dict.has_key? v, key
end
@doc "Returns a list of all keys in dict. The keys are not guaranteed to be in any order."
@spec keys(ordict) :: list
def keys(s(values: v)=_ordict) do
Dict.keys v
end
@doc "Merge two ordicts"
@spec merge(ordict, ordict) :: ordict
def merge(ordict1, ordict2) do
do_merge(ordict1, ordict2)
end
@spec merge(ordict, ordict, (key, crdt, crdt -> crdt)) :: ordict
def merge(ordict1, ordict2, fun) do
do_merge(ordict1, ordict2, fun)
end
@doc "Callback implementation for `Ravel.CRDT.do_merge`"
@spec do_merge(ordict, ordict) :: ordict
def do_merge(ordict1, ordict2) when ordict1 == ordict2 do
ordict1
end
def do_merge(ordict1, ordict2) do
do_merge(ordict1, ordict2, &merge_common_keys/3)
end
@doc "Merge two ordicts with a different merge function...USE WITH CAUTION"
@spec do_merge(ordict, ordict, (key, crdt, crdt -> crdt)) :: :not_supported
def do_merge(s(clock: clock1, values: values1)=v1, s(clock: clock2, values: values2)=_v2, fun) do
newclock = VClock.merge(clock1, clock2)
keys1 = HashSet.new(Dict.keys(values1))
keys2 = HashSet.new(Dict.keys(values2))
common_keys = Set.intersection(keys1, keys2)
uniquekeys1 = Set.difference(keys1, common_keys)
uniquekeys2 = Set.difference(keys2, common_keys)
newvalues = fun.(common_keys, values1, values2) |>
merge_disjoint_keys(uniquekeys1, values1, clock2) |>
merge_disjoint_keys(uniquekeys2, values2, clock1)
s(v1, clock: newclock, values: newvalues)
end
@doc "Returns the value associated with `key` in `dict` as well as the `dict` without `key`."
@spec pop(ordict, key, term) :: {term, ordict}
def pop(s(values: v)=ordict, key, default \\ nil) do
{_clock, new_value} = Dict.get v, key, default
new_dict = remove! ordict, key
{new_value, new_dict}
end
@doc "Insert new element into dict"
@spec put(ordict, {key, actor}, crdt) :: ordict
def put(ordict, {key, actor}, value) do
add!(ordict, actor, key, value)
end
@doc """
This operation is not supported.
Concurrent adds will violate the semantics of this operation.
"""
@spec put_new(ordict, {key, actor}, crdt) :: ordict
def put_new(_ordict, {_key, _actor}, _crdt) do
throw :not_supported
end
@doc "Callback implementation for Enumerable support, do not use directly"
@spec reduce(ordict, Enumerable.acc, Enumberable.reducer) :: Enumerable.result
def reduce(s(values: v), acc, fun) do
do_reduce(Enum.map(v, fn({k,{_,v}}) -> {k,v} end) |> Enum.sort, acc, fun)
end
@doc "Remove an element from the dict"
@spec remove(ordict, key) :: {:ok, ordict} | {:error, {:precondition, {:not_present, term() }}}
def remove(s(values: values)=ordict, key) do
case values[key] do
nil -> {:error, {:precondition, {:not_present, key}}}
_ -> {:ok, s(ordict, values: Dict.delete(values, key))}
end
end
@doc "Remove an element from the dict"
@spec remove!(ordict, key) :: ordict
def remove!(ordict, key) do
{:ok, new_ordict} = remove(ordict, key)
new_ordict
end
@doc "Get the size of the values"
@spec size(ordict) :: non_neg_integer
def size(s(values: values)) do
Dict.size(values)
end
@doc """
TODO: implement
"""
@spec split(ordict, [key]) :: {ordict, ordict}
def split(_ordict, _keys) do
:ok
end
@doc """
TODO: implement
"""
@spec take(ordict, [key]) :: ordict
def take(_ordict, _keys) do
:ok
end
@doc "To list, is value"
@spec to_list(ordict) :: [term]
def to_list(ordict), do: value(ordict)
@doc """
Update a value in `dict` by calling fun on the value to get a new value. If
key is not present in dict then initial will be stored as the first value.
"""
@spec update(ordict, key, crdt, (crdt -> crdt)) :: ordict
def update(s(values: v)=ordict, {actor, key}, crdt, fun) do
case v[key] do
nil ->
add!(ordict, actor, key, crdt)
{_old_clock, old_crdt} ->
new_crdt = fun.(old_crdt)
add!(ordict, actor, key, new_crdt)
end
end
@doc """
Update a value in `dict` by calling fun on the value to get a new value. An
exception is generated if `key` is not present in the `dict`.
"""
@spec update!(ordict, key, (crdt -> crdt)) :: ordict | no_return
def update!(s(values: v)=ordict, {actor, key}, fun) do
case v[key] do
nil ->
throw :key_not_present
{_old_clock, old_crdt} ->
new_crdt = fun.(old_crdt)
add!(ordict, actor, key, new_crdt)
end
end
@doc "Get the value of a ORDict. In this case, it's a list."
@spec value(ordict) :: [term()]
def value(s(values: values)) do
Enum.map values, fn({k, {_actor, v}}) ->
# module = elem(v, 0)
# {k, module.value(v)}
{k, v}
end
end
@doc "Get the values as a list"
@spec values(ordict) :: [crdt]
def values(s(values: v)=_ordict) do
Dict.values(v) |> Enum.map fn({_,crdt}) -> crdt end
end
####### TODO #############
# Merge common keys into a new values dictionary
@spec merge_common_keys(Set.t, Dict.t, Dict.t) :: Dict.t
defp merge_common_keys(common_keys, values1, values2) do
Enum.map(common_keys, fn(k) ->
{clock1, crdt1} = values1[k]
{clock2, crdt2} = values2[k]
new_clock = VClock.merge clock1, clock2
new_value = merge_child_crdt(crdt1, crdt2)
{k, {new_clock, new_value}}
end) |> HashDict.new
end
# Merge disjoint keys into the values dictionary
@spec merge_disjoint_keys(Dict.t, Set.t, Dict.t, vclock) :: Dict.t
defp merge_disjoint_keys(new_values, keys, values, setclock) do
Enum.reduce(keys, new_values, fn(k, acc) ->
{vclock,crdt} = values[k]
case VClock.descends(setclock, vclock) do
false ->
new_clock = VClock.subtract_dots(vclock, setclock)
Dict.put(acc, k, {new_clock, crdt})
true ->
acc
end
end)
end
defp merge_child_crdt(crdt1, crdt2) when elem(crdt1, 0)!==elem(crdt2, 0) do
throw :crdt_mismatch
end
defp merge_child_crdt(crdt1, crdt2) do
module = elem(crdt1, 0)
module.merge(crdt1, crdt2)
end
defp do_reduce(_, { :halt, acc }, _fun), do: { :halted, acc }
defp do_reduce(list, { :suspend, acc }, fun), do: { :suspended, acc, &do_reduce(list, &1, fun) }
defp do_reduce([], { :cont, acc }, _fun), do: { :done, acc }
defp do_reduce([h|t], { :cont, acc }, fun), do: do_reduce(t, fun.(h, acc), fun)
end
defimpl Ravel.CRDT, for: Ravel.ORDict do
alias Ravel.ORDict
def actors(crdt), do: ORDict.actors(crdt)
def equal?(crdt1, crdt2), do: crdt1 === crdt2
def equal_value?(crdt1, crdt2), do: ORDict.value(crdt1)===ORDict.value(crdt2)
def merge(crdt1, crdt2), do: ORDict.merge(crdt1, crdt2)
def value(crdt), do: ORDict.value(crdt)
end
defimpl Enumerable, for: Ravel.ORDict do
alias Ravel.ORDict
def count(ordict), do: {:ok, ORDict.size ordict}
def member?(ordict, value), do: {:ok, ORDict.has_key?(ordict, value)}
def reduce(ordict, acc, fun), do: ORDict.reduce(ordict, acc, fun)
end
defimpl Inspect, for: Ravel.ORDict do
import Inspect.Algebra
def inspect({Ravel.ORDict, clock, values}, opts) do
case opts.pretty do
true ->
concat [
"#ORDict<",
Inspect.List.inspect(Enum.map(HashDict.to_list(values), fn({k,{_,v}}) -> {k,v} end) |> Enum.sort, opts),
">"
]
false ->
concat [
"#ORDict<",
Kernel.inspect(clock, [pretty: true]), ", ",
Inspect.List.inspect(HashDict.to_list(values) |> Enum.sort, opts),
">"
]
end
end
end
defmodule Ravel.ORSet do
@moduledoc """
An observed-remove Set.
Doesn't use tombstones, but has 1 vclock for each entry and a vclock for the
set. When sets are merged, the set vclocks are used to make sure that deletes
don't reappear.
Based on `riak_dt_orswot`.
"""
alias Ravel.VClock
@behaviour Set
defrecordp :s, __MODULE__, [clock: nil, values: nil]
@type vclock :: VClock.vclock
@type actor :: term
@opaque orset :: {__MODULE__, clock :: vclock, values :: Dict.t}
@doc "Creates a new ORSet"
@spec new() :: orset
def new() do
s(clock: VClock.new(), values: HashDict.new())
end
@doc """
Create a new ORSet
"""
@spec new(Enum.t) :: orset
def new(enum) do
Enum.reduce enum, new, fn
({actor, value}, s()=acc) ->
add!(acc, actor, value);
(_, _) ->
{:error, :need_actor_tuple}
end
end
@doc "Creates a new ORSet given an Enum"
@spec new(Enum.t, actor) :: orset
def new(enum, actor) do
Enum.reduce enum, new, fn(v, acc) -> add!(acc, actor, v) end
end
@doc "List the current actors used"
@spec actors(orset) :: [actor]
def actors(s(clock: clock)=_orset) do
VClock.actors(clock)
end
@doc "Add an element to the set"
@spec add(orset, actor, term) :: {:ok, orset}
def add(s(clock: setclock, values: values)=orset, actor, elem) do
new_clock = VClock.increment(setclock, actor)
new_value_clock = VClock.new([{actor, VClock.get_counter(new_clock, actor)}])
new_values = HashDict.update(values, elem, new_value_clock, fn(old_value_clock) ->
VClock.merge(new_value_clock, old_value_clock)
end)
{:ok, s(orset, clock: new_clock, values: new_values)}
end
@doc "Add an element to the set"
@spec add!(orset, actor, term) :: orset
def add!(orset, actor, elem) do
{:ok, new_orset} = add(orset, actor, elem)
new_orset
end
@doc "Removes element from set, but violates semantics and ignores preconditions!"
@spec delete(orset, term) :: orset
def delete(orset, elem) do
case remove(orset, elem) do
{:ok, newset} -> newset
_ -> orset
end
end
@doc "Difference between 2 sets, return as a merged orset."
@spec difference(orset, orset) :: orset
def difference(set1, set2) do
Enum.reduce to_list(set2), set1, &delete(&2, &1)
end
@doc """
Check if the sets have no common elements and are causally disjoint.
The semantics of this function are not simple. If a merge function would
remove an element, even if that element is not "present" in each set, then
the sets are not considered disjoint.
If you want these simpler semantics, you can simply cast to a HashSet and
rerun the disjoint function.
"""
@spec disjoint?(orset, orset) :: boolean
def disjoint?(s(clock: clock1, values: values1)=_orset1, s(clock: clock2, values: values2)=_orset2) do
# Set.disjoint?(HashSet.new(value(set1)), HashSet.new(value(set2)))
# newclock = VClock.merge(clock1, clock2)
keys1 = HashSet.new(Dict.keys(values1))
keys2 = HashSet.new(Dict.keys(values2))
case Enum.count(Set.intersection(keys1, keys2)) do
0 ->
Enum.all?(values1, &do_disjoint_detect(clock2, &1)) and
Enum.all?(values2, &do_disjoint_detect(clock1, &1))
_ ->
false
end
end
@doc "Return an empty list of the same type as orset"
@spec empty(orset) :: boolean
def empty(s()=_orset), do: new
@doc "Check if the values of the orset are equal"
@spec equal?(orset, orset) :: boolean
def equal?(s(values: v1), s(values: v2)), do: v1.keys() -- v2.keys() == []
@doc "Intersection"
@spec intersection(orset, orset) :: orset
def intersection(s(clock: clock1, values: values1)=v1, s(clock: clock2, values: values2)=_v2) do
# I'm doing this just like merge(), but only for common keys, removing deletes in the merge_common_keys()
newclock = VClock.merge(clock1, clock2)
keys1 = HashSet.new(Dict.keys(values1))
keys2 = HashSet.new(Dict.keys(values2))
common_keys = Set.intersection(keys1, keys2)
newvalues = merge_common_keys(common_keys, values1, values2)
s(v1, clock: newclock, values: newvalues)
end
@doc "Check if the set contains an element"
@spec member?(orset, term) :: boolean
def member?(s(values: values), elem) do
values[elem] !== nil
end
@doc "Merge two orsets"
@spec merge(orset, orset) :: orset
def merge(orset1, orset2) do
do_merge(orset1, orset2)
end
@doc "Merge two orsets"
@spec do_merge(orset, orset) :: orset
def do_merge(v1, v2) when v1 == v2 do
v1
end
def do_merge(s(clock: clock1, values: values1)=v1, s(clock: clock2, values: values2)=_v2) do
newclock = VClock.merge(clock1, clock2)
keys1 = HashSet.new(Dict.keys(values1))
keys2 = HashSet.new(Dict.keys(values2))
common_keys = Set.intersection(keys1, keys2)
uniquekeys1 = Set.difference(keys1, common_keys)
uniquekeys2 = Set.difference(keys2, common_keys)
newvalues = merge_common_keys(common_keys, values1, values2) |>
merge_disjoint_keys(uniquekeys1, values1, clock2) |>
merge_disjoint_keys(uniquekeys2, values2, clock1)
s(v1, clock: newclock, values: newvalues)
end
@doc "Insert new element into set"
@spec put(orset, {actor, term}) :: orset
def put(orset, {actor, elem}), do: add!(orset, actor, elem)
@doc "Check if one set is a subset of the other...gotta think about the deletes"
@spec subset?(orset, orset) :: boolean
def subset?(s(values: v1)=_set1, s(values: v2)=_set2) do
Enum.all?(v1, fn({k,_}) -> v2[k] !== nil end)
end
@doc "To list, is value"
@spec to_list(orset) :: [term]
def to_list(s(values: values)=_orset) do
Dict.keys(values)
end
@doc "Callback implementation for Enumerable support, do not use directly"
@spec reduce(orset, Enumerable.acc, Enumberable.reducer) :: Enumerable.result
def reduce(s(values: v), acc, fun) do
do_reduce(Dict.keys(v) |> Enum.sort, acc, fun)
end
@doc "Remove an element from the set"
@spec remove(orset, term) :: {:ok, orset} | {:error, {:precondition, {:not_present, term() }}}
def remove(s(values: values)=orset, elem) do
case values[elem] do
nil -> {:error, {:precondition, {:not_present, elem}}}
_ -> {:ok, s(orset, values: Dict.delete(values, elem))}
end
end
@doc "Remove an element from the set"
@spec remove!(orset, term) :: orset
def remove!(orset, elem) do
{:ok, new_orset} = remove(orset, elem)
new_orset
end
@doc "Get the size of the values"
@spec size(orset) :: non_neg_integer
def size(s(values: values)) do
Dict.size(values)
end
@doc "Get the value for this CRDT"
@spec value(orset) :: [term]
def value(s(values: v)=_orset) do
HashSet.new(Dict.keys v)
end
@doc """
Union is an alias for the merge operation.
Note: This is not always the same as converting each ORSet to a HashSet and
running union on the Sets. Because this an alias operation for merge, deletes
currently propogate. If you believe these semantics are invalid, make a case.
"""
@spec union(orset, orset) :: orset
def union(set1, set2), do: merge(set1, set2)
# Merge common keys into a new values dictionary
@spec merge_common_keys(Set.t, Dict.t, Dict.t) :: Dict.t
defp merge_common_keys(common_keys, values1, values2) do
Enum.map(common_keys, fn(k) ->
{k, VClock.merge(values1[k], values2[k])}
end) |> HashDict.new
end
# Merge disjoint keys into the values dictionary
@spec merge_disjoint_keys(Dict.t, Set.t, Dict.t, vclock) :: Dict.t
defp merge_disjoint_keys(new_values, keys, values, setclock) do
Enum.reduce(keys, new_values, fn(k, acc) ->
vclock = values[k]
case VClock.descends(setclock, vclock) do
false ->
new_clock = VClock.subtract_dots(vclock, setclock)
Dict.put(acc, k, new_clock)
true ->
acc
end
end)
end
# Checks if these updates are all disjoint.
defp do_disjoint_detect(setclock, {_k,clock}) do
not VClock.descends(setclock, clock)
end
defp do_reduce(_, { :halt, acc }, _fun), do: { :halted, acc }
defp do_reduce(list, { :suspend, acc }, fun), do: { :suspended, acc, &do_reduce(list, &1, fun) }
defp do_reduce([], { :cont, acc }, _fun), do: { :done, acc }
defp do_reduce([h|t], { :cont, acc }, fun), do: do_reduce(t, fun.(h, acc), fun)
end
defimpl Ravel.CRDT, for: Ravel.ORSet do
alias Ravel.ORSet
def actors(crdt), do: ORSet.actors(crdt)
def equal?(crdt1, crdt2), do: crdt1 === crdt2
def equal_value?(crdt1, crdt2), do: ORSet.value(crdt1)===ORSet.value(crdt2)
def merge(crdt1, crdt2), do: ORSet.merge(crdt1, crdt2)
def value(crdt), do: ORSet.value(crdt)
end
defimpl Enumerable, for: Ravel.ORSet do
alias Ravel.ORSet
def count(orset), do: {:ok, ORSet.size orset}
def member?(orset, value), do: {:ok, ORSet.member?(orset, value)}
def reduce(orset, acc, fun), do: ORSet.reduce(orset, acc, fun)
end
defimpl Inspect, for: Ravel.ORSet do
import Inspect.Algebra
def inspect({Ravel.ORSet, clock, values}, opts) do
concat [
"#ORSet<", Kernel.inspect(clock, [pretty: true]), ", ",
Inspect.List.inspect(HashDict.to_list(values) |> Enum.sort, opts),
">" ]
end
end
defmodule Ravel.PNCounter do
@moduledoc "An increment-decrement counter"
@type actor :: term
@type value :: {inc :: pos_integer, decr :: pos_integer}
defrecordp :s, __MODULE__, dict: nil
@opaque pncounter :: { __MODULE__, dict: Dict.t | nil }
@doc "Creates a new PNCounter"
@spec new() :: pncounter
def new() do
s(dict: HashDict.new())
end
@doc "The actors currently in the Hash"
@spec actors(pncounter) :: [actor()]
def actors(s(dict: t)) do
Enum.sort HashDict.keys(t)
end
@doc "Decrement a PNCounter"
@spec decrement(pncounter, actor) :: pncounter
def decrement(counter, actor) do
decrement(counter, actor, 1)
end
@doc "Decrement a PNCounter by n"
@spec decrement(pncounter, actor, pos_integer) :: pncounter
def decrement(s(dict: t), actor, n) when n > 0 do
s(dict: HashDict.update(t, actor, {0, n}, fn({pos, neg}) -> {pos, neg+n} end))
end
@doc "Checks for the equality of the PNCounter, not the total value"
@spec equal(pncounter, pncounter) :: boolean
def equal(s(dict: v1), s(dict: v2)) do
v1 == v2
end
@doc "Increment a PNCounter"
@spec increment(pncounter, actor) :: pncounter
def increment(t, actor) do
increment(t, actor, 1)
end
@doc "Increment a PNCounter by n"
@spec increment(pncounter, actor, pos_integer) :: pncounter
def increment(s(dict: t), actor, n) when n > 0 do
s(dict: HashDict.update(t, actor, {n, 0}, fn({pos, neg}) -> {pos+n, neg} end))
end
@doc "Merge 2 PNCounters together"
@spec merge(pncounter, pncounter) :: pncounter
def merge(s(dict: v1)=_counter1, s(dict: v2)=_counter2) do
s(dict: HashDict.merge(v1, v2, fn(_k, {inc1, decr1}, {inc2, decr2}) -> {max(inc1, inc2), max(decr1, decr2)} end))
end
@doc "The total/current value of the PNCounter"
@spec value(pncounter) :: integer
def value(s(dict: t)) do
Enum.reduce(t, 0, fn({_, {inc, decr}}, acc) -> acc + inc - decr end)
end
end
defimpl Ravel.CRDT, for: Ravel.PNCounter do
alias Ravel.PNCounter
def actors(crdt), do: PNCounter.actors(crdt)
def equal?(crdt1, crdt2), do: crdt1 === crdt2
def equal_value?(crdt1, crdt2), do: PNCounter.value(crdt1)===PNCounter.value(crdt2)
def merge(crdt1, crdt2), do: PNCounter.merge(crdt1, crdt2)
def value(crdt), do: PNCounter.value(crdt)
end
defimpl Inspect, for: Ravel.PNCounter do
import Inspect.Algebra
def inspect({Ravel.PNCounter, dict}=counter, opts) do
case opts.pretty do
false ->
concat ["#PNCounter<", Inspect.List.inspect(HashDict.to_list(dict) |> Enum.sort, opts), ">"]
true ->
value = Ravel.PNCounter.value counter
"#PNCounter<#{value}>"
end
end
end
defmodule Ravel do
@moduledoc """
A native CRDT library for Elixir based on `riak_dt` from the great engineers at Basho.
"""
#use Application.Behaviour
# See http://elixir-lang.org/docs/stable/Application.Behaviour.html
# for more information on OTP Applications
def start(_type, _args) do
Ravel.Supervisor.start_link
end
end
defmodule Ravel.Supervisor do
use Supervisor.Behaviour
def start_link do
:supervisor.start_link(__MODULE__, [])
end
def init([]) do
children = [
# Define workers and child supervisors to be supervised
# worker(Ravel.Worker, [])
]
# See http://elixir-lang.org/docs/stable/Supervisor.Behaviour.html
# for other strategies and supported options
supervise(children, strategy: :one_for_one)
end
end
defmodule Ravel.VClock do
@moduledoc "A vector clock implementation built on top of HashDict"
@type actor :: term
@type counter :: pos_integer
defrecordp :s, __MODULE__, dict: nil
@opaque vclock :: { __MODULE__, dict: Dict.t | nil }
@doc "Returns a new vector clock"
@spec new() :: vclock
def new do
s(dict: HashDict.new())
end
@doc "Returns a new vector clock, given previous {actor, counter} pairs"
@spec new([{actor(), counter()}]) :: vclock
def new(vclock_dict) do
# TODO: validate the vector clock.
case Enum.all?(vclock_dict, fn({_k,_v}) -> true; (_) -> false end) do
true -> s(dict: HashDict.new(vclock_dict))
false -> throw :invalid_dict
end
end
def actors(s(dict: v)) do
Dict.keys(v)
end
@doc "Returns true when v2 descends from v1, returns false otherwise"
@spec descends(vclock, vclock) :: boolean
def descends(s(dict: v1), s(dict: v2)) do
Enum.reduce(v2, true, fn
(_, false) -> false;
({k,counter2}, true) ->
case v1[k] do
nil -> false
counter1 -> counter1 >= counter2
end
end)
end
@doc "Returns true when v1 dominates v2"
@spec dominates(vclock, vclock) :: boolean
def dominates(clock1, clock2) do
descends(clock1, clock2) and !descends(clock2, clock1)
end
@doc "Subtracts elements from the first vclock where actor counters in the second vclock are larger"
@spec subtract_dots(vclock, vclock) :: vclock
def subtract_dots(s(dict: v1), s(dict: v2)) do
s(dict: Enum.reduce(v1, HashDict.new(), fn({k, counter1}, acc) ->
case v2[k] do
nil -> Dict.put(acc, k, counter1)
counter2 when counter1 <= counter2 -> HashDict.put(acc, k, counter1)
counter2 when counter1 > counter2 -> acc
end
end))
end
@doc "Combines a list of vclocks"
@spec merge([vclock]) :: vclock
def merge([]) do [] end
def merge([vclock|rest]) when is_list(rest) do merge2(rest, vclock) end
@doc "Combine 2 vclocks"
@spec merge(vclock, vclock) :: vclock
def merge(v1, v2) do
merge([v1, v2])
end
defp merge2([], a) do a end
defp merge2([s(dict: b)|clocks], s(dict: a)) do
merge2(clocks, s(dict: Dict.merge(a, b, fn(_, v1, v2) ->
max(v1, v2)
end)))
end
@doc "Get the counter for the given actor"
@spec get_counter(vclock, actor) :: counter()
def get_counter(s(dict: v), actor) do
Dict.get(v, actor, 0)
end
@doc "Increment actor by 1"
@spec increment(vclock, actor) :: vclock
def increment(s(dict: v), actor) do
s(dict: HashDict.update(v, actor, 1, &(&1+1)))
end
@doc "Return the nodes in the system"
@spec nodes(vclock) :: [actor()]
def nodes(s(dict: v)) do
Enum.sort HashDict.keys v
end
@doc "Return the equality of the vector clocks"
@spec equal(vclock, vclock) :: boolean
def equal(s(dict: v1), s(dict: v2)) do
v1 == v2
end
@doc "Replace {new, old} actors"
@spec replace_actors(vclock, [{actor(), actor()}]) :: vclock()
def replace_actors(s(dict: vclock), map) do
s(dict: Enum.map(vclock, fn({k, v}) ->
case Enum.find(map, fn({_,old}) -> old==k end) do
nil -> {k, v}
{new, _} -> {new, v}
end
end) |> HashDict.new())
end
end
import Inspect.Algebra
defimpl Inspect, for: Ravel.VClock do
def inspect({Ravel.VClock, dict}, opts) do
concat ["#VClock<", Inspect.List.inspect(HashDict.to_list(dict) |> Enum.sort, opts), ">"]
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment