A simple Reliable Causal Broadcast implementation using F# and Akka.NET
module Program = | |
type InMemoryDb(replica: ReplicaId) = | |
let snapshot = ref null | |
let mutable events : Map<uint64,obj> = Map.empty | |
interface Db with | |
member _.SaveSnapshot state = async { snapshot := (box state) } | |
member _.LoadSnapshot<'s>() = async { | |
match !snapshot with | |
| :? 's as state -> return Some state | |
| _ -> return None | |
} | |
member _.LoadEvents<'e>(from) = asyncSeq { | |
for (seqNr, e) in Map.toSeq events do | |
if seqNr >= from then | |
let casted : Event<'e> = downcast e | |
yield casted | |
} | |
member _.SaveEvents evts = async { | |
for event in evts do | |
events <- Map.add event.SeqNr (box event) events | |
} | |
let main () = | |
let sys = System.create "sys" <| Configuration.parse "akka.loglevel = DEBUG" | |
let a = spawn sys "A" <| props (ORSet.props (InMemoryDb "A") "A") | |
let b = spawn sys "B" <| props (ORSet.props (InMemoryDb "B") "B") | |
async { | |
a <! Connect("B", b) | |
b <! Connect("A", a) | |
let! state = ORSet.add 1L a | |
printfn "State on node A (first): %A" state | |
do! Async.Sleep 1000 | |
let! state = ORSet.add 2L a | |
printfn "State on node A (second): %A" state | |
do! Async.Sleep 5000 | |
let! state = ORSet.query b | |
printfn "State on node B (after sync): %A" state | |
let! state = ORSet.remove 2L b | |
printfn "State on node B (after update): %A" state | |
do! Async.Sleep 5000 | |
let! state1 = ORSet.query a | |
let! state2 = ORSet.query b | |
assert (state1 = state2) | |
printfn "SUCCESS" | |
} |> Async.RunSynchronously | |
0 |
/// Reliable causal broadcast implementation | |
module DemoFs.RCB | |
open Akka.Actor | |
open Akkling | |
open System | |
open FSharp.Control | |
type ReplicaId = String | |
type Ord = | |
| Lt = -1 // lower | |
| Eq = 0 // equal | |
| Gt = 1 // greater | |
| Cc = 2 // concurrent | |
[<RequireQualifiedAccess>] | |
module Helpers = | |
/// Helper method for insert-or-update semantic for Map | |
let upsert k v fn map = | |
match Map.tryFind k map with | |
| None -> Map.add k v map | |
| Some v -> Map.add k (fn v) map | |
type VTime = Map<ReplicaId, int64> | |
[<RequireQualifiedAccess>] | |
module Version = | |
let zero: VTime = Map.empty | |
let inc r (vv: VTime): VTime = vv |> Helpers.upsert r 1L ((+)1L) | |
let set r ts (vv: VTime): VTime = Map.add r ts vv | |
let merge (vv1: VTime) (vv2: VTime) = | |
vv2 |> Map.fold (fun acc k v2 -> Helpers.upsert k v2 (max v2) acc) vv1 | |
let compare (a: VTime) (b: VTime): Ord = | |
let valOrDefault k map = | |
match Map.tryFind k map with | |
| Some v -> v | |
| None -> 0L | |
let akeys = a |> Map.toSeq |> Seq.map fst |> Set.ofSeq | |
let bkeys = b |> Map.toSeq |> Seq.map fst |> Set.ofSeq | |
(akeys + bkeys) | |
|> Seq.fold (fun prev k -> | |
let va = valOrDefault k a | |
let vb = valOrDefault k b | |
match prev with | |
| Ord.Eq when va > vb -> Ord.Gt | |
| Ord.Eq when va < vb -> Ord.Lt | |
| Ord.Lt when va > vb -> Ord.Cc | |
| Ord.Gt when va < vb -> Ord.Cc | |
| _ -> prev ) Ord.Eq | |
/// A contaienr for user-defined events of type 'e. It contains metadata necessary | |
/// to partially order and distribute events in peer-to-peer fashion. | |
type Event<'e> = | |
{ /// The replica, on which this event originally was created. | |
Origin: ReplicaId | |
/// The sequence number given by the origin replica at the moment of event creation. | |
/// This allows us to keep track of replication progress with remote replicas even | |
/// when we didn't received their events directly, but via intermediate replica. | |
OriginSeqNr: uint64 | |
/// The sequence number given by the local replica. For events created by current replica | |
/// it's the same as `OriginSeqNr`. For replicated events it's usually higher. | |
LocalSeqNr: uint64 | |
/// Vector clock which describes happened-before relationships between events from | |
/// different replicas, enabling to establish partial order among them. | |
Version: VTime | |
/// An user-defined event data. | |
Data: 'e } | |
override this.ToString() = sprintf "(%s, %i, %i, %A, %O)" this.Origin this.OriginSeqNr this.LocalSeqNr this.Version this.Data | |
type Endpoint<'s,'c,'e> = IActorRef<Protocol<'s,'c,'e>> | |
and Protocol<'s,'c,'e> = | |
/// Attaches another replica to continuously synchronize with current one. | |
| Connect of replicaId:ReplicaId * Endpoint<'s,'c,'e> | |
/// Request to read up to `maxCount` events from a given replica starting from `seqNr`. Additionally a `filter` | |
/// is provided to deduplicate possible events on the sender side (it will be then used second time on receiver side). | |
/// This message is expected to be replied with `Recovered`, which contains all events satisfying seqNr/filter criteria. | |
| Replicate of seqNr:uint64 * maxCount:int * filter:VTime * replyTo:Endpoint<'s,'c,'e> | |
| ReplicateTimeout of ReplicaId | |
/// Response to `Recover` - must always be send. Empty content notifies about end of event stream. `toSeqNr` informs | |
/// up to which sequence number this message advanced. | |
| Replicated of from:ReplicaId * toSeqNr:uint64 * events:Event<'e>[] | |
/// Request for a state. It should be replied with state being application of `Crdt.Query` over `ReplicationState.Crdt`. | |
| Query | |
/// Persists an event into current replica. Replied with updated, materialized state after success. | |
| Command of 'c | |
/// Message send at the beginning of recovery phase with the latest persisted snapshot of the state (if there was any) | |
| Loaded of ReplicationState<'s> | |
/// Periodic trigger to persist current state snapshot (only performed if state has changed since last snapshot, tracked by IsDirty flag). | |
| Snapshot | |
and ReplicationState<'s> = | |
{ /// Unique identifier of a given replica/node. | |
Id: ReplicaId | |
/// Checks if replication state has been modified after being persisted. | |
IsDirty: bool | |
/// Counter used to assign unique sequence number for the events to be stored locally. | |
SeqNr: uint64 | |
/// Version vector describing the last observed event. | |
Version: VTime | |
/// Sequence numbers of remote replicas. When synchronizing (via `Recover` message) with remote replicas, | |
/// we start doing so from the last known sequence numbers we received. | |
Observed: Map<ReplicaId, uint64> | |
/// CRDT object that is replicated. | |
Crdt: 's } | |
[<RequireQualifiedAccess>] | |
module ReplicationState = | |
let inline create (id: ReplicaId) state = { Id = id; IsDirty = false; SeqNr = 0UL; Version = Map.empty; Observed = Map.empty; Crdt = state } | |
/// Checks if current event has NOT been observed by a replica identified by state. Unseen events are those, which | |
/// have SeqNr higher than the highest observed sequence number on a given node AND their version vectors were not | |
/// observed (meaning they are either greater or concurrent to current node version). | |
let unseen nodeId (state: ReplicationState<'s>) (e: Event<'e>) = | |
match Map.tryFind nodeId state.Observed with | |
| Some ver when e.OriginSeqNr <= ver -> false | |
| _ -> (Version.compare e.Version state.Version) > Ord.Eq | |
[<Interface>] | |
type Db = | |
abstract SaveSnapshot: 's -> Async<unit> | |
abstract LoadSnapshot: unit -> Async<'s option> | |
abstract LoadEvents: startSeqNr:uint64 -> AsyncSeq<Event<'e>> | |
abstract SaveEvents: events:Event<'e> seq -> Async<unit> | |
/// Use database `cursor` to read up to `count` elements and send them to the `target` as Recovered message. | |
/// Send only entries that have keys starting with a given `prefix` (eg. events belonging to specific nodeId). | |
/// Use `filter` to skip events that have been seen by the `target`. | |
let replay (nodeId: ReplicaId) (filter: VTime) (target: Endpoint<'s,'c,'e>) (events: AsyncSeq<Event<'e>>) (count:int) = async { | |
let buf = ResizeArray() | |
let mutable cont = count > 0 | |
let mutable i = 0 | |
let mutable lastSeqNr = 0UL | |
use cursor = events.GetEnumerator() | |
while cont do | |
match! cursor.MoveNext() with | |
| Some e -> | |
if Version.compare e.Version filter > Ord.Eq then | |
buf.Add(e) | |
i <- i + 1 | |
cont <- i < count | |
lastSeqNr <- Math.Max(lastSeqNr, e.LocalSeqNr) | |
| _ -> cont <- false | |
let events = buf.ToArray() | |
target <! Replicated(nodeId, lastSeqNr, events) | |
} | |
let recoverTimeout = TimeSpan.FromSeconds 5. | |
type ReplicationStatus<'s,'c,'e> = | |
{ /// Access point for the remote replica. | |
Endpoint: Endpoint<'s,'c,'e> | |
/// Cancellation token for pending `RecoverTimeout`. | |
Timeout: ICancelable } | |
[<Interface>] | |
type Crdt<'crdt,'state,'cmd,'event> = | |
/// Get a default (zero) value of the CRDT. | |
abstract Default: 'crdt | |
/// Given a CRDT state return an actual value that user has interest in. Eg. ORSet still has to carry | |
/// metadata timestamps, however from user perspective materialized value of ORSet is just ordrinary Set<'a>. | |
abstract Query: 'crdt -> 'state | |
/// Equivalent of command handler in eventsourcing analogy. | |
abstract Prepare: state:'crdt * command:'cmd -> 'event | |
/// Equivalent of event handler in eventsourcing analogy. | |
abstract Effect: state:'crdt * event:Event<'event> -> 'crdt | |
let replicator (crdt: Crdt<'crdt,'state,'cmd,'event>) (db: Db) (id: ReplicaId) (ctx: Actor<Protocol<_,_,_>>) = | |
/// Cancel last pending `RecoverTimeout` task, and schedule it again. | |
let refreshTimeouts nodeId progresses (ctx: Actor<_>) = | |
let p = Map.find nodeId progresses | |
p.Timeout.Cancel() | |
let timeout = ctx.Schedule recoverTimeout ctx.Self (ReplicateTimeout nodeId) | |
Map.add nodeId { p with Timeout = timeout } progresses | |
let rec active (db: Db) (state: ReplicationState<'crdt>) (replicatingNodes: Map<ReplicaId, ReplicationStatus<'crdt,'cmd,'event>>) (ctx: Actor<_>) = actor { | |
match! ctx.Receive() with | |
| Query -> | |
ctx.Sender() <! crdt.Query state.Crdt | |
return! active db state replicatingNodes ctx | |
| Replicate(from, count, filter, sender) -> | |
logDebugf ctx "received recover request from %s: seqNr=%i, vt=%O" sender.Path.Name from filter | |
let cursor = db.LoadEvents(from) | |
replay state.Id filter sender cursor count |> Async.Start | |
return! active db state replicatingNodes ctx | |
| Replicated(nodeId, lastSeqNr, [||]) -> | |
// if we received empty event list, this node is up to date with `nodeId` | |
// just schedule timeout, so when it happens we ask to Recover again | |
logDebugf ctx "%s reached end of updates" nodeId | |
let prog = refreshTimeouts nodeId replicatingNodes ctx | |
let observedSeqNr = Map.tryFind nodeId state.Observed |> Option.defaultValue 0UL | |
if lastSeqNr > observedSeqNr then | |
let nstate = { state with Observed = Map.add nodeId lastSeqNr state.Observed } | |
do! db.SaveSnapshot(nstate.Id, nstate) | |
return! active db nstate prog ctx | |
else | |
return! active db state prog ctx | |
| Replicated(nodeId, lastSeqNr, events) -> | |
let mutable nstate = state | |
let mutable remoteSeqNr = Map.tryFind nodeId nstate.Observed |> Option.defaultValue 0UL | |
let toSave = ResizeArray() | |
// for all events not seen by the current node, rewrite them to use local sequence nr, update the state | |
// and save them in the database | |
for e in events |> Array.filter (ReplicationState.unseen nodeId state) do | |
logDebugf ctx "replicating event %O from replica %s" e nodeId | |
let seqNr = nstate.SeqNr + 1UL | |
let version = Version.merge nstate.Version e.Version // update current node version vector | |
remoteSeqNr <- Math.Max(remoteSeqNr, e.LocalSeqNr) // increment observed remote sequence nr | |
let nevent = { e with LocalSeqNr = seqNr } | |
nstate <- { nstate with | |
Crdt = crdt.Effect(nstate.Crdt, nevent) | |
SeqNr = seqNr | |
Version = version | |
Observed = Map.add nodeId remoteSeqNr nstate.Observed } | |
toSave.Add nevent | |
do! db.SaveEvents toSave // save all unseen events together with updated state | |
//do! db.SaveSnapshot nstate // in practice snapshot should be applied on condition (ideally in the same transaction) | |
let target = Map.find nodeId replicatingNodes | |
target.Endpoint <! Replicate(lastSeqNr+1UL, 100, nstate.Version, ctx.Self) // continue syncing | |
let prog = refreshTimeouts nodeId replicatingNodes ctx | |
return! active db { nstate with IsDirty = true } prog ctx | |
| ReplicateTimeout nodeId -> | |
// if we didn't received Recovered in time or the last one was empty, upon timeout just retry the request | |
logDebugf ctx "%s didn't replied to read request in time" nodeId | |
let seqNr = Map.tryFind nodeId state.Observed |> Option.defaultValue 0UL | |
let p = Map.find nodeId replicatingNodes | |
p.Endpoint <! Replicate(seqNr+1UL, 100, state.Version, ctx.Self) | |
let timeout = ctx.Schedule recoverTimeout ctx.Self (ReplicateTimeout nodeId) | |
let prog = Map.add nodeId { p with Timeout = timeout } replicatingNodes | |
return! active db state prog ctx | |
| Command(cmd) -> | |
let sender = ctx.Sender() | |
let seqNr = state.SeqNr + 1UL | |
let version = Version.inc state.Id state.Version | |
let data = crdt.Prepare(state.Crdt, cmd) // handle the command, produce event | |
let event = { Origin = state.Id; OriginSeqNr = seqNr; LocalSeqNr = seqNr; Version = version; Data = data } | |
let ncrdt = crdt.Effect(state.Crdt, event) // update the state with produced event | |
let nstate = { state with Version = version; SeqNr = seqNr; Crdt = ncrdt } | |
// store new event atomically with updated state | |
do! db.SaveEvents [event] | |
logDebugf ctx "stored event %O in a database" event | |
sender <! crdt.Query ncrdt // send updated materialized CRDT state back to the sender | |
return! active db { nstate with IsDirty = true } replicatingNodes ctx | |
| Connect(nodeId, endpoint) -> | |
// connect with the remote replica, and start synchronizing with it | |
let seqNr = Map.tryFind nodeId state.Observed |> Option.defaultValue 0UL | |
endpoint <! Replicate(seqNr+1UL, 100, state.Version, ctx.Self) | |
logDebugf ctx "connected with replica %s. Sending read request starting from %i" nodeId (seqNr+1UL) | |
let timeout = ctx.Schedule recoverTimeout ctx.Self (ReplicateTimeout nodeId) | |
return! active db state (Map.add nodeId { Endpoint = endpoint; Timeout = timeout } replicatingNodes) ctx | |
| Snapshot when state.IsDirty -> | |
logDebugf ctx "Snapshot triggered" | |
let nstate = { state with IsDirty = false } | |
do! db.SaveSnapshot nstate | |
return! active db nstate replicatingNodes ctx | |
| _ -> return Unhandled | |
} | |
let rec recovering (db: Db) (ctx: Actor<_>) = actor { | |
match! ctx.Receive() with | |
| Loaded state -> | |
logDebugf ctx "Recovery phase done with state: %O" state | |
ctx.UnstashAll() | |
let interval = TimeSpan.FromSeconds 5. | |
ctx.ScheduleRepeatedly interval interval ctx.Self Snapshot |> ignore | |
return! active db state Map.empty ctx | |
| _ -> | |
// stash all other operations until recovery is complete | |
ctx.Stash() | |
return! recovering db ctx | |
} | |
async { | |
// load state from DB snapshot or create a new empty one | |
let! snapshot = db.LoadSnapshot() | |
let mutable state = snapshot |> Option.defaultValue (ReplicationState.create id crdt.Default) | |
// apply all events that happened since snapshot has been made | |
for event in db.LoadEvents (state.SeqNr + 1UL) do | |
state <- { state with | |
Crdt = crdt.Effect(state.Crdt, event) | |
SeqNr = event.LocalSeqNr | |
Version = Version.merge event.Version state.Version | |
Observed = Map.add event.Origin event.OriginSeqNr state.Observed } | |
ctx.Self <! Loaded state | |
} |> Async.Start | |
recovering db ctx | |
[<RequireQualifiedAccess>] | |
module Counter = | |
let private crdt = | |
{ new Crdt<int64,int64,int64,int64> with | |
member _.Default = 0L | |
member _.Query crdt = crdt | |
member _.Prepare(_, op) = op | |
member _.Effect(counter, e) = counter + e.Data } | |
/// Used to create replication endpoint handling operation-based Counter protocol. | |
let props db replica ctx = replicator crdt db replica ctx | |
/// Increment counter maintainer by given `ref` endpoint by a given delta (can be negative). | |
let inc (by: int64) (ref: Endpoint<int64,int64,int64>) : Async<int64> = ref <? Command by | |
/// Retrieve the current state of the counter maintained by the given `ref` endpoint. | |
let query (ref: Endpoint<int64,int64,int64>) : Async<int64> = ref <? Query | |
[<RequireQualifiedAccess>] | |
module ORSet = | |
type ORSet<'a> when 'a: comparison = Set<'a * VTime> | |
type Command<'a> = | |
| Add of 'a | |
| Remove of 'a | |
type Operation<'a> = | |
| Added of 'a | |
| Removed of Set<VTime> | |
type Endpoint<'a> when 'a: comparison = Endpoint<ORSet<'a>, Command<'a>, Operation<'a>> | |
let private crdt : Crdt<ORSet<'a>, Set<'a>, Command<'a>, Operation<'a>> = | |
{ new Crdt<_,_,_,_> with | |
member _.Default = Set.empty | |
member _.Query(orset) = orset |> Set.map fst | |
member _.Prepare(orset, cmd) = | |
match cmd with | |
| Add item -> Added(item) | |
| Remove item -> | |
let timestamps = | |
orset | |
|> Set.filter (fun (i, _) -> i = item) | |
|> Set.map snd | |
Removed timestamps | |
member _.Effect(orset, e) = | |
match e.Data with | |
| Added(item) -> Set.add (item, e.Version) orset | |
| Removed(versions) -> orset |> Set.filter (fun (_, ts) -> not (Set.contains ts versions)) } | |
/// Used to create replication endpoint handling operation-based ORSet protocol. | |
let props db replicaId ctx = replicator crdt db replicaId ctx | |
/// Add new `item` into an ORSet maintained by the given `ref` endpoint. In case of add/remove conflicts add wins. | |
let add (item: 'a) (ref: Endpoint<'a>) : Async<Set<'a>> = ref <? Command (Add item) | |
/// Remove an `item` from the ORSet maintained by the given `ref` endpoint. In case of add/remove conflicts add wins. | |
let remove (item: 'a) (ref: Endpoint<'a>) : Async<Set<'a>> = ref <? Command (Remove item) | |
/// Retrieve the current state of the ORSet maintained by the given `ref` endpoint. | |
let query (ref: Endpoint<'a>) : Async<Set<'a>> = ref <? Query | |
[<RequireQualifiedAccess>] | |
module LWWRegister = | |
[<Struct>] | |
type LWWRegister<'a> = | |
{ Timestamp: struct(DateTime * ReplicaId) | |
Value: 'a voption } | |
type Operation<'a> = DateTime * 'a voption | |
type Endpoint<'a> = Endpoint<LWWRegister<'a>, 'a voption, Operation<'a>> | |
let private crdt : Crdt<LWWRegister<'a>, 'a voption, 'a voption, Operation<'a>> = | |
{ new Crdt<_,_,_,_> with | |
member _.Default = { Timestamp = struct(DateTime.MinValue, ""); Value = ValueNone } | |
member _.Query crdt = crdt.Value | |
member _.Prepare(_, value) = (DateTime.UtcNow, value) | |
member _.Effect(existing, e) = | |
let (at, value) = e.Data | |
let timestamp = struct(at, e.Origin) | |
if existing.Timestamp < timestamp then | |
{ existing with Timestamp = timestamp; Value = value } | |
else existing } | |
let props db replica ctx = replicator crdt db replica ctx | |
let updte (value: 'a voption) (ref: Endpoint<'a>) : Async<'a voption> = ref <? Command value | |
let query (ref: Endpoint<'a>) : Async<'a voption> = ref <? Query | |
[<RequireQualifiedAccess>] | |
module MVRegister = | |
type MVRegister<'a> = (VTime * 'a voption) list | |
type Endpoint<'a> = Endpoint<MVRegister<'a>, 'a voption, 'a voption> | |
let private crdt : Crdt<MVRegister<'a>, 'a list, 'a voption, 'a voption> = | |
{ new Crdt<_,_,_,_> with | |
member _.Default = [] | |
member _.Query crdt = | |
crdt | |
|> List.choose (function (_, ValueSome v) -> Some v | _ -> None) | |
member _.Prepare(_, value) = value | |
member _.Effect(existing, e) = | |
let concurrent = | |
existing | |
|> List.filter (fun (vt, _) -> Version.compare vt e.Version = Ord.Cc) | |
(e.Version, e.Data)::concurrent } | |
let props db replica ctx = replicator crdt db replica ctx | |
let updte (value: 'a voption) (ref: Endpoint<'a>) : Async<'a voption> = ref <? Command value | |
let query (ref: Endpoint<'a>) : Async<'a voption> = ref <? Query | |
[<RequireQualifiedAccess>] | |
module Rga = | |
/// Virtual index - while physical index of an element in RGA changes as new elements are appended or removed, | |
/// a virtual index always stays the same. It allows tracking the item position over time. | |
type VPtr = (int * ReplicaId) | |
type Vertex<'a> = (VPtr * 'a option) | |
type Rga<'a> = | |
{ Sequencer: VPtr | |
Vertices: Vertex<'a>[] } | |
type Command<'a> = | |
| Insert of index:int * value:'a | |
| RemoveAt of index:int | |
type Operation<'a> = | |
| Inserted of after:VPtr * at:VPtr * value:'a | |
| Removed of at:VPtr | |
/// Checks if given vertex has been tombstoned. | |
let inline isTombstone (_, data) = Option.isNone data | |
/// Maps user-given index (which ignores tombstones) into physical index inside of `vertices` array. | |
let private indexWithTombstones index vertices = | |
let rec loop offset remaining (vertices: Vertex<'a>[]) = | |
if remaining = 0 then offset | |
elif isTombstone vertices.[offset] then loop (offset+1) remaining vertices // skip over tombstones | |
else loop (offset+1) (remaining-1) vertices | |
loop 1 index vertices // skip head as it's always tombstoned (it serves as reference point) | |
/// Maps user-given VIndex into physical index inside of `vertices` array. | |
let private indexOfVPtr ptr vertices = | |
let rec loop offset ptr (vertices: Vertex<'a>[]) = | |
if ptr = fst vertices.[offset] then offset | |
else loop (offset+1) ptr vertices | |
loop 0 ptr vertices | |
/// Recursively checks if the next vertex on the right of a given `offset` | |
/// has position higher than `pos` at if so, shift offset to the right. | |
/// | |
/// By design, when doing concurrent inserts, we skip over elements on the right | |
/// if their Position is higher than Position of inserted element. | |
let rec private shift offset ptr (vertices: Vertex<'a>[]) = | |
if offset >= vertices.Length then offset // append at the end | |
else | |
let (next, _) = vertices.[offset] | |
if next < ptr then offset | |
else shift (offset+1) ptr vertices // move insertion point to the right | |
/// Increments given sequence number. | |
let inline private nextSeqNr ((i, id): VPtr) : VPtr = (i+1, id) | |
let private createInserted i value rga = | |
let index = indexWithTombstones i rga.Vertices // start from 1 to skip header vertex | |
let prev = fst rga.Vertices.[index-1] // get VPtr of previous element or RGA's head | |
let at = nextSeqNr rga.Sequencer | |
Inserted(prev, at, value) | |
let private createRemoved i rga = | |
let index = indexWithTombstones i rga.Vertices // start from 1 to skip header vertex | |
let at = fst rga.Vertices.[index] // get VPtr of a previous element | |
Removed at | |
let private applyInserted (predecessor: VPtr) (ptr: VPtr) value rga = | |
// find index where predecessor vertex can be found | |
let predecessorIdx = indexOfVPtr predecessor rga.Vertices | |
// adjust index where new vertex is to be inserted | |
let insertIdx = shift (predecessorIdx+1) ptr rga.Vertices | |
// update RGA to store the highest observed sequence number | |
let (seqNr, replicaId) = rga.Sequencer | |
let nextSeqNr = (max (fst ptr) seqNr, replicaId) | |
let newVertices = Array.insert insertIdx (ptr, Some value) rga.Vertices | |
{ Sequencer = nextSeqNr; Vertices = newVertices } | |
let private applyRemoved ptr rga = | |
// find index where removed vertex can be found and clear its content to tombstone it | |
let index = indexOfVPtr ptr rga.Vertices | |
let (at, _) = rga.Vertices.[index] | |
{ rga with Vertices = Array.replace index (at, None) rga.Vertices } | |
let private crdt (replicaId: ReplicaId) : Crdt<Rga<'a>, 'a[], Command<'a>, Operation<'a>> = | |
{ new Crdt<_,_,_,_> with | |
member _.Default = { Sequencer = (0,replicaId); Vertices = [| ((0,""), None) |] } | |
member _.Query rga = rga.Vertices |> Array.choose snd | |
member _.Prepare(rga, cmd) = | |
match cmd with | |
| Insert(i, value) -> createInserted i value rga | |
| RemoveAt(i) -> createRemoved i rga | |
member _.Effect(rga, e) = | |
match e.Data with | |
| Inserted(predecessor, ptr, value) -> applyInserted predecessor ptr value rga | |
| Removed(ptr) -> applyRemoved ptr rga | |
} | |
type Endpoint<'a> = Endpoint<Rga<'a>, Command<'a>, Operation<'a>> | |
/// Used to create replication endpoint handling operation-based RGA protocol. | |
let props db replicaId ctx = replicator (crdt replicaId) db replicaId ctx | |
/// Inserts an `item` at given index. To insert at head use 0 index, | |
/// to push back to a tail of sequence insert at array length. | |
let insert (index: int) (item: 'a) (ref: Endpoint<'a>) : Async<'a[]> = ref <? Command (Insert(index, item)) | |
/// Removes item stored at a provided `index`. | |
let removeAt (index: int) (ref: Endpoint<'a>) : Async<'a[]> = ref <? Command (RemoveAt index) | |
/// Retrieve an array of elements maintained by the given `ref` endpoint. | |
let query (ref: Endpoint<'a>) : Async<'a[]> = ref <? Query | |
/// Block-wise RGA. It exposes operations for adding/removing multiple elements at once. | |
[<RequireQualifiedAccess>] | |
module BWRga = | |
type Position = (int * ReplicaId) | |
[<Struct>] | |
type PositionOffset = | |
{ Position: Position; Offset: int } | |
override this.ToString () = sprintf "(%i%s:%i)" (fst this.Position) (snd this.Position) this.Offset | |
[<Struct>] | |
type Content<'a> = | |
| Content of content:'a[] | |
| Tombstone of skipped:int | |
member this.Slice(offset: int) = | |
match this with | |
| Content data -> | |
let left = Array.take offset data | |
let right = Array.skip offset data | |
(Content left, Content right) | |
| Tombstone length -> | |
(Tombstone offset, Tombstone (length - offset)) | |
type Block<'a> = | |
{ Ptr: PositionOffset | |
//TODO: in this approach Block contains both user data and CRDT metadata, it's possible | |
// however to split these appart and all slicing manipulations can be performed on blocks | |
// alone. In this case Query method could return an user data right away with no extra | |
// modifications, while the user-content could be stored in optimized structure such as Rope, | |
// instead of deeply cloned arrays used here. | |
Data: Content<'a> } | |
member this.Length = | |
match this.Data with | |
| Content data -> data.Length | |
| Tombstone skipped -> skipped | |
override this.ToString() = | |
sprintf "%O -> %A" this.Ptr this.Data | |
[<RequireQualifiedAccess>] | |
module Block = | |
let tombstone (block: Block<'a>) = { block with Data = Tombstone block.Length } | |
let isTombstone (block: Block<'a>) = match block.Data with Tombstone _ -> true | _ -> false | |
let split (offset) (block: Block<'a>) = | |
if offset = block.Length then (block, None) | |
else | |
let ptr = block.Ptr | |
let (a, b) = block.Data.Slice offset | |
let left = { block with Data = a } | |
let right = { Ptr = { ptr with Offset = ptr.Offset + offset }; Data = b } | |
(left, Some right) | |
type Rga<'a> = | |
{ Sequencer: Position | |
Blocks: Block<'a>[] } | |
type Command<'a> = | |
| Insert of index:int * 'a[] | |
| RemoveAt of index:int * count:int | |
type Operation<'a> = | |
| Inserted of after:PositionOffset * at:Position * value:'a[] | |
| Removed of slices:(PositionOffset*int) list | |
/// Given user-aware index, return an index of a block and position inside of that block, | |
/// which matches provided index. | |
let private findByIndex idx blocks = | |
let rec loop currentIndex consumed (idx: int) (blocks: Block<'a>[]) = | |
if idx = consumed then (currentIndex, 0) | |
else | |
let block = blocks.[currentIndex] | |
if Block.isTombstone block then | |
loop (currentIndex+1) consumed idx blocks | |
else | |
let remaining = idx - consumed | |
if remaining <= block.Length then | |
// we found the position somewhere in the block | |
(currentIndex, remaining) | |
else | |
// move to the next block with i shortened by current block length | |
loop (currentIndex + 1) (consumed + block.Length) idx blocks | |
loop 0 0 idx blocks | |
let private findByPositionOffset ptr blocks = | |
let rec loop idx ptr (blocks: Block<'a>[]) = | |
let block = blocks.[idx] | |
if block.Ptr.Position = ptr.Position then | |
if block.Ptr.Offset + block.Length >= ptr.Offset then (idx, ptr.Offset-block.Ptr.Offset) | |
else loop (idx+1) ptr blocks | |
else loop (idx+1) ptr blocks | |
loop 0 ptr blocks | |
/// Recursively check if the next vertex on the right of a given `offset` | |
/// has position higher than `pos` at if so, shift offset to the right. | |
let rec private shift offset pos (blocks: Block<'a>[]) = | |
if offset >= blocks.Length then offset // append at the tail | |
else | |
let next = blocks.[offset].Ptr.Position | |
if next < pos then offset | |
else shift (offset+1) pos blocks // move insertion point to the right | |
/// Increments given sequence number. | |
let inline private nextSeqNr ((i, id): Position) : Position = (i+1, id) | |
let private sliceBlocks start count blocks = | |
let rec loop acc idx offset remaining (blocks: Block<'a>[]) = | |
let block = blocks.[idx] | |
let ptr = block.Ptr | |
let ptr = { ptr with Offset = ptr.Offset + offset } | |
let len = block.Length - offset | |
if len > remaining then (ptr, remaining)::acc | |
elif len = 0 then loop acc (idx+1) 0 remaining blocks // skip over empty blocks | |
else loop ((ptr, len)::acc) (idx+1) 0 (remaining-len) blocks | |
let (first, offset) = findByIndex start blocks | |
loop [] first offset count blocks |> List.rev | |
let private filterBlocks slices blocks = | |
let rec loop (acc: ResizeArray<Block<'a>>) idx slices (blocks: Block<'a>[]) = | |
match slices with | |
| [] -> | |
for i=idx to blocks.Length-1 do | |
acc.Add blocks.[i] // copy over remaining blocks | |
acc.ToArray() | |
| (ptr, length)::tail -> | |
let block = blocks.[idx] | |
if block.Ptr.Position = ptr.Position then // we found valid block | |
let currLen = block.Length | |
if block.Ptr.Offset = ptr.Offset then // the beginning of deleted block was found | |
if currLen = length then // deleted block exactly matches bounds | |
acc.Add (Block.tombstone block) | |
loop acc (idx+1) tail blocks | |
elif currLen < length then // deleted block is longer, delete current one and keep remainder | |
acc.Add (Block.tombstone block) | |
let ptr = { ptr with Offset = ptr.Offset + currLen } | |
loop acc (idx+1) ((ptr, length-currLen)::tail) blocks | |
else // deleted block is shorter, we need to split current block and tombstone left side | |
let (left, Some right) = Block.split length block | |
acc.Add (Block.tombstone left) | |
acc.Add right | |
loop acc (idx+1) tail blocks | |
elif block.Ptr.Offset < ptr.Offset && block.Ptr.Offset + currLen > ptr.Offset then // the deleted block starts inside of a current one | |
let splitPoint = ptr.Offset - block.Ptr.Offset | |
let (left, Some right) = Block.split splitPoint block | |
acc.Add left | |
if length > right.Length then // remainer is longer than right, we need to subtract it and keep around | |
let remainer = length - right.Length | |
acc.Add (Block.tombstone right) | |
let pos = { ptr with Offset = right.Ptr.Offset + right.Length } | |
loop acc (idx+1) ((pos, remainer)::tail) blocks | |
else | |
let (del, right) = Block.split length right | |
acc.Add (Block.tombstone del) | |
right |> Option.iter acc.Add | |
loop acc (idx+1) tail blocks | |
else // position ID is correct but offset doesn't fit, we need to move on | |
acc.Add block | |
loop acc (idx+1) slices blocks | |
else | |
acc.Add block | |
loop acc (idx+1) slices blocks | |
loop (ResizeArray()) 1 slices blocks | |
let private crdt (replicaId: ReplicaId) : Crdt<Rga<'a>, 'a[], Command<'a>, Operation<'a>> = | |
{ new Crdt<_,_,_,_> with | |
member _.Default = | |
let head = { Ptr = { Position = (0,""); Offset = 0 }; Data = Tombstone 0 } | |
{ Sequencer = (0,replicaId); Blocks = [| head |] } | |
member _.Query rga = rga.Blocks |> Array.collect (fun block -> match block.Data with Content data -> data | _ -> [||]) | |
member _.Prepare(rga, cmd) = | |
match cmd with | |
| Insert(idx, slice) -> | |
let (index, offset) = findByIndex idx rga.Blocks | |
let ptr = rga.Blocks.[index].Ptr | |
let at = nextSeqNr rga.Sequencer | |
Inserted({ ptr with Offset = ptr.Offset+offset }, at, slice) | |
| RemoveAt(idx, count) -> | |
let slices = sliceBlocks idx count rga.Blocks | |
Removed slices | |
member _.Effect(rga, e) = | |
match e.Data with | |
| Inserted(after, at, slice) -> | |
let (index, split) = findByPositionOffset after rga.Blocks | |
let indexAdjusted = shift (index+1) at rga.Blocks | |
let block = rga.Blocks.[index] | |
let newBlock = { Ptr = { Position = at; Offset = 0}; Data = Content slice } | |
let (left, right) = Block.split split block | |
let (seqNr, replicaId) = rga.Sequencer | |
let nextSeqNr = (max (fst at) seqNr, replicaId) | |
let blocks = | |
rga.Blocks | |
|> Array.replace index left | |
|> Array.insert indexAdjusted newBlock | |
match right with | |
| Some right -> | |
let blocks = blocks |> Array.insert (indexAdjusted+1) right | |
{ Sequencer = nextSeqNr; Blocks = blocks } | |
| None -> | |
{ Sequencer = nextSeqNr; Blocks = blocks } | |
| Removed(slices) -> | |
let blocks = filterBlocks slices rga.Blocks | |
{ rga with Blocks = blocks } | |
} | |
type Endpoint<'a> = Endpoint<Rga<'a>, Command<'a>, Operation<'a>> | |
/// Used to create replication endpoint handling operation-based RGA protocol. | |
let props db replicaId ctx = replicator (crdt replicaId) db replicaId ctx | |
let insertRange (index: int) (slice: 'a[]) (ref: Endpoint<'a>) : Async<'a[]> = ref <? Command (Insert(index, slice)) | |
let removeRange (index: int) (count: int) (ref: Endpoint<'a>) : Async<'a[]> = ref <? Command (RemoveAt(index, count)) | |
/// Retrieve the current state of the RGA maintained by the given `ref` endpoint. | |
let query (ref: Endpoint<'a>) : Async<'a[]> = ref <? Query | |
[<RequireQualifiedAccess>] | |
module LSeq = | |
[<Struct;CustomComparison;CustomEquality>] | |
type VPtr = | |
{ Sequence: byte[]; Id: ReplicaId } | |
override this.ToString() = | |
String.Join('.', this.Sequence) + ":" + string this.Id | |
member this.CompareTo(other) = | |
// apply lexical comparison of sequence elements | |
let len = min this.Sequence.Length other.Sequence.Length | |
let mutable i = 0 | |
let mutable cmp = 0 | |
while cmp = 0 && i < len do | |
cmp <- this.Sequence.[i].CompareTo other.Sequence.[i] | |
i <- i + 1 | |
if cmp = 0 then | |
// one of the sequences is subsequence of another one, | |
// compare their lengths (cause maybe they're the same) | |
// then compare replica ids | |
cmp <- this.Sequence.Length - other.Sequence.Length | |
if cmp = 0 then this.Id.CompareTo other.Id else cmp | |
else cmp | |
interface IComparable<VPtr> with member this.CompareTo other = this.CompareTo other | |
interface IComparable with member this.CompareTo other = match other with :? VPtr as vptr -> this.CompareTo(vptr) | |
interface IEquatable<VPtr> with member this.Equals other = this.CompareTo other = 0 | |
type Vertex<'a> = (VPtr * 'a) | |
type LSeq<'a> = Vertex<'a>[] | |
type Command<'a> = | |
| Insert of index:int * value:'a | |
| RemoveAt of index:int | |
type Operation<'a> = | |
| Inserted of at:VPtr * value:'a | |
| Removed of at:VPtr | |
/// Binary search for index of `vptr` in an ordered sequence, looking for a place to insert | |
/// an element. If `vptr` is the lowest element, 0 will be returned. If it's the highest | |
/// one: lseq.Length will be returned. | |
let private binarySearch vptr (lseq: LSeq<_>) = | |
let mutable i = 0 | |
let mutable j = lseq.Length | |
while i < j do | |
let half = (i + j) / 2 | |
if vptr >= fst lseq.[half] then i <- half + 1 | |
else j <- half | |
i | |
/// Generates a byte sequence that - ordered lexically - would fit between `lo` and `hi`. | |
let private generateSeq (lo: byte[]) (hi: byte[]) = | |
let rec loop (acc: ResizeArray<byte>) i (lo: byte[]) (hi: byte[]) = | |
let min = if i >= lo.Length then 0uy else lo.[i] | |
let max = if i >= hi.Length then 255uy else hi.[i] | |
if min + 1uy < max then | |
acc.Add (min + 1uy) | |
acc.ToArray() | |
else | |
acc.Add min | |
loop acc (i+1) lo hi | |
loop (ResizeArray (min lo.Length hi.Length)) 0 lo hi | |
let private crdt (replicaId: ReplicaId) : Crdt<LSeq<'a>, 'a[], Command<'a>, Operation<'a>> = | |
{ new Crdt<_,_,_,_> with | |
member _.Default = [||] | |
member _.Query lseq = lseq |> Array.map snd | |
member _.Prepare(lseq, cmd) = | |
match cmd with | |
| Insert(i, value) -> | |
let left = if Array.isEmpty lseq || i = 0 then [||] else (fst lseq.[i-1]).Sequence | |
let right = if i = lseq.Length then [||] else (fst lseq.[i]).Sequence | |
let ptr = { Sequence = generateSeq left right; Id = replicaId } | |
Inserted(ptr, value) | |
| RemoveAt(i) -> Removed(fst lseq.[i]) | |
member _.Effect(lseq, e) = | |
match e.Data with | |
| Inserted(ptr, value) -> | |
let idx = binarySearch ptr lseq | |
Array.insert idx (ptr, value) lseq | |
| Removed(ptr) -> | |
let idx = binarySearch ptr lseq | |
Array.removeAt idx lseq | |
} | |
type Endpoint<'a> = Endpoint<LSeq<'a>, Command<'a>, Operation<'a>> | |
/// Used to create replication endpoint handling operation-based RGA protocol. | |
let props db replicaId ctx = replicator (crdt replicaId) db replicaId ctx | |
/// Inserts an `item` at given index. To insert at head use 0 index, | |
/// to push back to a tail of sequence insert at array length. | |
let insert (index: int) (item: 'a) (ref: Endpoint<'a>) : Async<'a[]> = ref <? Command (Insert(index, item)) | |
/// Removes item stored at a provided `index`. | |
let removeAt (index: int) (ref: Endpoint<'a>) : Async<'a[]> = ref <? Command (RemoveAt index) | |
/// Retrieve an array of elements maintained by the given `ref` endpoint. | |
let query (ref: Endpoint<'a>) : Async<'a[]> = ref <? Query | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment