Skip to content

Instantly share code, notes, and snippets.

@Horusiath
Last active May 17, 2024 15:12
Show Gist options
  • Save Horusiath/1091a4c3629acf995d50e506db35d5f2 to your computer and use it in GitHub Desktop.
Save Horusiath/1091a4c3629acf995d50e506db35d5f2 to your computer and use it in GitHub Desktop.
Yata move algorithm
namespace Demos
open System
type ReplicaId = String
[<RequireQualifiedAccess>]
module Array =
let string (a: 't[]) =
let sb = System.Text.StringBuilder()
sb.Append("[") |> ignore
let mutable e = a.GetEnumerator()
if e.MoveNext() then
sb.Append(string e.Current) |> ignore
while e.MoveNext() do
sb.Append(", ").Append(string e.Current) |> ignore
sb.Append("]") |> ignore
sb.ToString()
let insert idx item array =
let len = Array.length array
let copy = Array.zeroCreate (len + 1)
Array.blit array 0 copy 0 idx
copy.[idx] <- item
Array.blit array idx copy (idx+1) (len - idx)
copy
let removeAt idx array =
let len = Array.length array
let copy = Array.zeroCreate (len - 1)
Array.blit array 0 copy 0 idx
Array.blit array (idx+1) copy idx (len - idx - 1)
copy
let replace idx item array =
let copy = Array.copy array
copy.[idx] <- item
copy
/// Binary search for index in an ordered sequence, looking for a place to insert
/// an element. Predicate can be used as eg. `fun a -> toInsert >= a`.
/// If 'toInsert' is the lowest element, 0 will be returned. If it's the highest
/// one: array.Length will be returned.
let binarySearch (predicate: 'a -> bool) (array: 'a[]) =
let mutable i = 0
let mutable j = array.Length
while i < j do
let half = (i + j) / 2
if not (predicate array.[half]) then i <- half + 1
else j <- half
i
namespace Demos.Yata
open System.Collections.Generic
open Demos
type ID = (ReplicaId * uint64)
type Content<'t> =
| Value of value:'t
| Tombstone
| Moved of from:ID * priority:int
type Block<'t> =
{ Id: ID // unique block identifier
OriginLeft: Option<ID>
OriginRight: Option<ID>
MovedTo: Option<ID>
Value: Content<'t> }
module Block =
let isDeleted b = match b.Value with Tombstone -> true | _ -> false
let value b = match b.Value with Value v -> Some v | _ -> None
/// A minimal implementation of YATA CRDT. This one lacks any optimizations
/// and is created mostly for educational purposes.
///
/// For paper, see: https://www.researchgate.net/publication/310212186_Near_Real-Time_Peer-to-Peer_Shared_Editing_on_Extensible_Data_Types
type Yata<'t> = Block<'t>[]
[<RequireQualifiedAccess>]
module Yata =
/// Returns zero/default/empty instance of YATA array.
let zero: Yata<'t> = [||]
/// Returns index of block identified by given `id` within YATA `array`,
/// or `None` if no such block existed.
let private indexOf (array: Yata<'t>) (id: ID) =
array |> Array.tryFindIndex (fun b -> b.Id = id)
/// Sequence that produces blocks with values in the order that respects their move.
let private blocks (array: Yata<'t>) = seq {
let mutable i = 0
while i < array.Length do
let block = array.[i]
match block.Value with
| Moved(from,_) ->
let j = indexOf array from |> Option.get
let target = array.[j]
match target.Value with
| Value _ when target.MovedTo = Some block.Id -> yield (j, target)
| _ -> ()
| Value _ when Option.isNone block.MovedTo -> yield (i, block)
| Value _ -> ()
| Tombstone -> ()
i <- i + 1
}
/// Returns a value of YATA array, stripped of all of the metadata, without tombstones.
let value (array: Yata<'t>) : 't[] =
blocks array
|> Seq.choose (fun (_, b) -> Block.value b)
|> Seq.toArray
let str (array: Yata<char>) : string =
let sb = System.Text.StringBuilder()
for c in value array do
sb.Append c |> ignore
sb.ToString()
/// Maps used defined `index` into an actual block index, skipping over deleted blocks.
let private findPosition (index: int) (array: Yata<'t>) =
blocks array |> Seq.skip index |> Seq.tryHead
/// Gets last sequence number for a given `replicaId` (0 in no block with given `id` exists).
let private lastSeqNr replicaId (array: Yata<'t>) =
let rec loop blocks id seqNr i =
if i >= Array.length blocks then seqNr
else
let (id', seqNr') = blocks.[i].Id
if id' = id then
loop blocks id (max seqNr seqNr') (i+1)
else loop blocks id seqNr (i+1)
loop array replicaId 0UL 0
/// Safe indexer function, which returns a block at given index `i`
/// or `None` if index was outside of the bounds of an `array`.
let private getBlock (i: int) (array: Yata<'t>) : Option<Block<'t>> =
if i < 0 || i >= array.Length then None
else Some array.[i]
/// This function deals with the complexity of determining where to insert
/// a given `block` within YATA `array`, given the circumstances that in the
/// meantime other blocks might have been inserted concurrently.
let rec private findInsertIndex (array: Yata<'t>) block scanning left right dst i =
let dst = if scanning then dst else i
if i = right || i = Array.length array then dst
else
let o = array.[i]
let oleft = o.OriginLeft |> Option.bind (indexOf array) |> Option.defaultValue -1
let oright = o.OriginRight |> Option.bind (indexOf array) |> Option.defaultValue array.Length
let id1 = fst block.Id
let id2 = fst o.Id
if oleft < left || (oleft = left && oright = right && id1 <= id2)
then dst
else
let scanning = if oleft = left then id1 <= id2 else scanning
findInsertIndex array block scanning left right dst (i+1)
let private integrateMoved block (array: Yata<'t>) =
match block.Value with
| Moved(target, prio) ->
// we need to check if target block was not already moved by another operation, in such case the one with
// higher priority and ID wins
let targetIdx = indexOf array target |> Option.get
let target = array.[targetIdx]
match target.MovedTo with
| None -> array.[targetIdx] <- { target with MovedTo = Some block.Id }
| Some other ->
let otherIdx = indexOf array other |> Option.get
let other = array.[otherIdx]
match other.Value with
| Moved(_, prio2) ->
if prio > prio2 || (prio = prio2 && block.Id > other.Id) then
// current move operation has precedence over already existing one, override previous move
array.[targetIdx] <- { target with MovedTo = Some block.Id }
| _ -> () // this shouldn't happen
| _ -> ()
array
/// Puts given `block` into an YATA `array` based on the adjacency of its
/// left and right origins. This behavior is shared between `insert` and `merge` functions.
let private integrate (block: Block<'t>) (array: Yata<'t>) : Yata<'t> =
let (id, seqNr) = block.Id
let last = lastSeqNr id array
if last <> seqNr - 1UL
// since we operate of left/right origins we cannot allow for the gaps between blocks to happen
then failwithf "operation out of order: tried to insert after (%s,%i): %O" id last block
else
let left =
block.OriginLeft
|> Option.bind (indexOf array)
|> Option.defaultValue -1
let right =
block.OriginRight
|> Option.bind (indexOf array)
|> Option.defaultValue (Array.length array)
let i = findInsertIndex array block false left right (left+1) (left+1)
let array = Array.insert i block array // since we do deep array copy here, we can mutate it from this point
array
/// Inserts a given `value` at provided `index` of an YATA `array`. Insert is performed
/// from the perspective of `replicaId`.
let insert (replicaId: ReplicaId) (index: int) (value: 't) (array: Yata<'t>) : Yata<'t> =
let (i, right) =
findPosition index array
|> Option.map (fun (i, block) -> (i, Some block.Id))
|> Option.defaultValue (array.Length, None)
let seqNr = 1UL + lastSeqNr replicaId array
let left = array |> getBlock (i-1) |> Option.map (fun b -> b.Id)
let block =
{ Id = (replicaId, seqNr)
OriginLeft = left
OriginRight = right
MovedTo = None
Value = Value value }
integrate block array
/// Deletes an element at given `index`. YATA uses tombstones to mark items as deleted,
/// so that they can be later used as reference points (origins) by potential concurrent
/// operations.
let delete (index: int) (blocks: Yata<'t>) : Yata<'t> =
let (i, block) = findPosition index blocks |> Option.get
let tombstoned = { block with Value = Tombstone }
Array.replace i tombstoned blocks
/// Merges two YATA arrays together.
let merge (a: Yata<'t>) (b: Yata<'t>) : Yata<'t> =
// IDs of the blocks that have been tombstoned
let tombstones = b |> Array.choose (fun b -> if Block.isDeleted b then Some b.Id else None)
let mutable a =
a // tombstone existing elements
|> Array.map (fun block ->
if not (Block.isDeleted block) && Array.contains block.Id tombstones
then { block with Value = Tombstone } // mark block as deleted
else block
)
// IDs of blocks already existing in `a`
let mutable seen = a |> Array.map (fun b -> b.Id) |> Set.ofArray
let blocks =
b
// deduplicate blocks already existing in current array `a`
|> Array.filter (fun block -> not (Set.contains block.Id seen))
let mutable remaining = blocks.Length
let inline isPresent seen id =
id
|> Option.map (fun id -> Set.contains id seen)
|> Option.defaultValue true
while remaining > 0 do
for block in blocks do
// make sure that block was not already inserted
// but its dependencies are already present in `a`
let canInsert =
not (Set.contains block.Id seen) &&
(isPresent seen block.OriginLeft) &&
(isPresent seen block.OriginRight)
if canInsert then
a <- integrate block a
seen <- Set.add block.Id seen
remaining <- remaining - 1
for block in blocks do
a <- integrateMoved block a
a
type Delta<'t> = (Yata<'t> * ID[])
/// Returns version representing the actual progression and state of Yata array.
let version (a: Yata<'t>) : VTime =
a
|> Array.fold (fun vtime block ->
let (replicaId, seqNr) = block.Id
match Map.tryFind replicaId vtime with
| None -> Map.add replicaId seqNr vtime
| Some seqNr' -> Map.add replicaId (max seqNr' seqNr) vtime
) Version.zero
/// Computes delta based on `version` produced by `Yata.version` vector generated
/// by one peer and state `a` present on another peer. Returned delta is a combination
/// of Yata array, which contains only blocks that appeared after given `version`
/// and so called delete set which contains IDs of deleted blocks.
let delta (version: VTime) (a: Yata<'t>) : Delta<'t> =
let deltaArray =
a
|> Array.filter (fun block ->
let (replicaId, seqNr) = block.Id
match Map.tryFind replicaId version with
| None -> true
| Some n -> seqNr > n)
// Note: in practical implementation delete set can be compressed.
let deleteSet =
a
|> Array.choose (fun block -> if Block.isDeleted block then Some block.Id else None)
(deltaArray, deleteSet)
/// Merges given `delta` from remote into local Yata array `a`.
let mergeDelta (delta: Delta<'t>) (a: Yata<'t>) : Yata<'t> =
let (b, deleteSet) = delta
merge a b
|> Array.map (fun block ->
if not (Block.isDeleted block) && Array.contains block.Id deleteSet
then { block with Value = Tombstone } // tombstone block
else block)
let move (replicaId: ReplicaId) (src: int) (dst: int) (array: Yata<'t>) : Yata<'t> =
let (src, moved) = findPosition src array |> Option.get
let (dst, right) =
findPosition dst array
|> Option.map (fun (i,block) -> (i, Some block.Id))
|> Option.defaultValue (array.Length, None)
let seqNr = 1UL + lastSeqNr replicaId array
let left = array |> getBlock (dst-1) |> Option.map (fun b -> b.Id)
let prio =
// if our src was moved by other block, get this move block priority and increment it
moved.MovedTo
|> Option.bind (fun dst -> indexOf array dst)
|> Option.bind (fun idx -> getBlock idx array)
|> Option.map (fun block -> match block.Value with Moved(_,prio) -> prio + 1 | _ -> 0)
|> Option.defaultValue 0
let block =
{ Id = (replicaId, seqNr)
OriginLeft = left
OriginRight = right
MovedTo = None
Value = Moved(moved.Id, prio) }
integrate block array |> integrateMoved block
module Demos.Yata.Tests
open Expecto
open Demos.Yata
let [<Literal>] A = "A"
let [<Literal>] B = "B"
[<Tests>]
let tests = testList "Yata" [
test "merge insert" {
let a = Yata.zero
|> Yata.insert A 0 'a'
|> Yata.insert A 1 'd'
let b = a |> Yata.insert B 1 'c'
let a = a |> Yata.insert A 1 'b'
Expect.equal (Yata.str a) "abd" "pre-merge (A)"
Expect.equal (Yata.str b) "acd" "pre-merge (B)"
let a1 = Yata.merge a b
let a2 = Yata.merge b a
Expect.equal (Yata.str a1) "abcd" "post-merge"
Expect.equal a1 a2 "post-merge commutativity"
}
test "merge insert/delete" {
let a = Yata.zero
|> Yata.insert A 0 'a'
|> Yata.insert A 1 'd'
let b = a |> Yata.insert B 1 'c'
let a = a |> Yata.delete 1
Expect.equal (Yata.str a) "a" "pre-merge (A)"
Expect.equal (Yata.str b) "acd" "pre-merge (B)"
let a1 = Yata.merge a b
let a2 = Yata.merge b a
Expect.equal (Yata.str a1) "ac" "post-merge"
Expect.equal a1 a2 "post-merge commutativity"
}
ftest "merge moved" {
let a = Yata.zero
|> Yata.insert A 0 'a'
|> Yata.insert A 1 'b'
|> Yata.insert A 2 'c'
let b = a |> Yata.move B 1 3
let a = a |> Yata.move A 1 0
Expect.equal (Yata.str a) "bac" "pre-merge (A)"
Expect.equal (Yata.str b) "acb" "pre-merge (B)"
let a1 = Yata.merge a b
let a2 = Yata.merge b a
Expect.equal (Yata.str a1) "acb" "post-merge"
Expect.equal a1 a2 "post-merge"
}
]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment