Last active
May 17, 2024 15:11
-
-
Save Horusiath/49f99dc8ca94e54229f3d80120a80f0f to your computer and use it in GitHub Desktop.
RAMP Hybrid transaction protocol implementation using Akka.NET in F#
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
module Demo.Ramp | |
open System | |
open Akka.Actor | |
open Akkling | |
/// A Lamport clock timestamp used as transaction identifier - sequence number with unique node identifier. | |
/// Original paper implementation uses combination of hybrid logical clock with node id encoded together into uint64. | |
type TxnId = DateTime * int | |
type Bloom = System.Collections.BitArray | |
[<RequireQualifiedAccess>] | |
module Map = | |
let keys (map: Map<'k,'v>) = map |> Seq.map (fun e -> e.Key) | |
let private hash (key: string): int = | |
let h = Akka.Util.MurmurHash.StringHash key | |
Math.Abs(h) | |
/// Simplistic implementation of an immutable Bloom Filter. It does deep copy of underlying bit set on every insert. | |
[<RequireQualifiedAccess>] | |
module Bloom = | |
/// Bloom filter size (in bits). | |
let [<Literal>] private SIZE = 64 | |
/// Number of hashes used per key. | |
let [<Literal>] private HASH_COUNT = 3 | |
let empty = System.Collections.BitArray(SIZE, false) | |
let add (key: string) (filter: Bloom) : Bloom = | |
let copy = filter.Clone() :?> Bloom | |
for i in 0..HASH_COUNT do | |
copy.[(hash (key + string i)) % SIZE] <- true | |
copy | |
let maybeContains (key: string) (filter: Bloom) : bool = | |
let rec loop (i: int) (key: string) (filter: Bloom) = | |
if i = HASH_COUNT then true | |
elif not filter.[(hash (key + string i)) % SIZE] then false | |
else loop (i+1) key filter | |
loop 0 key filter | |
/// Value bucket used for RAMP Hybrid algorithm: | |
/// - RAMP Fast would use list of transaction keys instead of Bloom Filter: this gives 1RTT Gets in every case at cost of | |
/// more heavy data items (metadata has variable size dependent of number of keys within transaction). | |
/// - RAMP Small wouldn't need to use any extra metadata at cost of constant 2 roundtrips on get request. | |
/// - RAMP Hybrid uses Bloom filters for metadata - if happy case it means 1RTT. In some cases when bloom filter cannot | |
/// give straight answer, it means 2RTT for Get request. In either way bloom filter metadata has constant size. | |
[<Struct>] | |
type Record = | |
{ value: byte[] // value stored - we use Last Write Wins semantics | |
txnId: TxnId // a transaction timestamp for this data item | |
filter: Bloom } // RAMP Hybrid bloom filter | |
override this.ToString() = | |
let bf = | |
let sb = System.Text.StringBuilder() | |
for i in this.filter do | |
sb.Append(if i then '1' else '0') |> ignore | |
sb.ToString() | |
sprintf "(value: %A, txn: %O, filter: %O)" this.value this.txnId bf | |
type Message = | |
| ReadAll of keys:Set<string> * timeout:TimeSpan // user request for RAMP-Hybrid read transaction | |
| ReadAllReply of entries:Map<string,byte[]> // reply to `GetAll` | |
| WriteAll of entries:Map<string,byte[]> * timeout:TimeSpan // user request for RAMP-Hybrid write transaction | |
| WriteAllReply // reply to `PutAll` | |
| Read of key:string * deps:TxnId[] // partition entry request - transaction dependencies attached | |
| ReadReply of key:string * Record option // reply to `Get` | |
| Prepare of TxnId * deadline:DateTime * key:string * Record // prepare phase msg for a single entry from `PutAll` request | |
| Prepared of TxnId // reply to `Prepare` | |
| Commit of TxnId * key:string // commit phase msg for a single entry after `Prepared` was received | |
| Committed of TxnId // reply to `Commit` | |
| VacuumTick // tick used for vacuum cleaning | |
| Abort // abort transaction | |
| Aborted // reply to service on `Abort` request | |
/// Partition manages entries some fragment of key-space. These entries are multi-versioned (MVCC). | |
/// Partitions can be dispatched on multiple nodes. In normal situation to manage their placement, one could use | |
/// distributed hash table (DHT) and use handover protocol as new nodes come and go, but it's not the purpose of this | |
/// demo. | |
module Partition = | |
type private State = | |
{ versions: Map<string, Map<TxnId, Record>> // MVCC dictionary - entries are versioned by timestamp | |
lastCommit: Map<string, TxnId> // the latest committed transaction for each entry | |
deadlines: Map<TxnId, DateTime> // completion deadlines given each active transaction | |
vacuum: ICancelable option } // MVCC vacuum cleaner scheduled task | |
let rec private ready (state: State) (ctx: Actor<obj>) = actor { | |
let! msg = ctx.Receive() | |
let sender = ctx.Sender() | |
match msg with | |
| :? Message as msg -> | |
match msg with | |
| Read(key, deps) -> | |
let reply = | |
if Array.isEmpty deps then | |
// dependencies are empty on 1st round of Get request gathering - just return the latest entries | |
match Map.tryFind key state.lastCommit, Map.tryFind key state.versions with | |
| Some ts, Some versions -> Map.tryFind ts versions | |
| _, _ -> None | |
else | |
// on 2nd round of Get request return specific entries requested | |
let versions = Map.tryFind key state.versions |> Option.defaultValue Map.empty | |
let recentTimestamps = Map.keys versions |> Seq.sortDescending | |
let found = recentTimestamps |> Seq.tryFind (fun version -> Array.contains version deps) | |
found |> Option.map (fun version -> Map.find version versions) | |
logInfof ctx "received get for '%s' with %i deps" key deps.Length | |
sender <! ReadReply(key, reply) | |
return! ready state ctx | |
| Prepare(txn, deadline, key, value) -> | |
logInfof ctx "received prepare for '%s'=%O" key value | |
let versions = Map.tryFind key state.versions |> Option.defaultValue Map.empty | |
let versions = Map.add txn value versions | |
let deadlines = Map.add txn deadline state.deadlines | |
let state = { state with versions = Map.add key versions state.versions; deadlines = deadlines } | |
sender <! Prepared txn | |
return! ready state ctx | |
| Commit(txn, key) -> | |
logInfof ctx "received commit for '%s' with txn id: %O" key txn | |
let old = Map.tryFind key state.lastCommit |> Option.defaultValue (DateTime.MinValue, 0) | |
let state = | |
if old >= txn then state // use Last Write Wins semantics | |
else { state with lastCommit = Map.add key txn state.lastCommit } | |
sender <! Committed txn | |
return! ready state ctx | |
| VacuumTick -> | |
// drop all non-latest versions older than 2 minutes - this means that each transaction has hard 2min execution limit | |
let mutable count = 0 | |
let deadline = DateTime.UtcNow + TimeSpan.FromMinutes 1. // add 1min grace period for clock drifts | |
// get list of overdue transactions | |
let overdue = | |
state.deadlines | |
|> Seq.choose (fun e -> if e.Value >= deadline then Some e.Key else None) | |
|> Set.ofSeq | |
let versions = | |
state.versions | |
|> Map.map (fun key versions -> | |
versions | |
|> Map.filter (fun txn item -> | |
match Map.tryFind key state.lastCommit with | |
| Some ts when ts = txn -> true // this is committed entry | |
| None -> true // we cannot guarantee that transaction finished - it may have passed point of no return | |
| _ -> | |
if Set.contains item.txnId overdue then true | |
else // drop transactions over deadline | |
count <- count + 1 | |
false | |
) | |
) | |
if count > 0 then | |
logInfof ctx "vacuum removed %i outdated entries" count | |
// reschedule vacuum - we do not use repeated scheduler as we don't know how long will it take | |
// for the vacuum to complete - vacuum can work incrementally eg. scan only limited no. of entries on | |
// each tick | |
let vacuum = ctx.Schedule (TimeSpan.FromMinutes 1.) (retype ctx.Self) VacuumTick | |
let deadlines = overdue |> Set.fold (fun acc txn -> Map.remove txn acc) state.deadlines | |
return! ready { state with versions = versions; deadlines = deadlines; vacuum = Some vacuum } ctx | |
| _ -> return Unhandled | |
| LifecycleEvent(PreStart) -> | |
let vacuum = ctx.Schedule (TimeSpan.FromMinutes 1.) (retype ctx.Self) VacuumTick | |
return! ready { state with vacuum = Some vacuum } ctx | |
| LifecycleEvent(PostStop) -> | |
state.vacuum |> Option.iter(fun c -> c.Cancel()) | |
return Ignore | |
| _ -> return Ignore | |
} | |
let props () = | |
let state = { versions = Map.empty; lastCommit = Map.empty; deadlines = Map.empty; vacuum = None } | |
props (ready state) | |
/// Write transaction coordinator. Uses 2-phase commit to update all entries atomically. | |
module private WriteCoordinator = | |
type State = | |
{ txn: TxnId | |
total: int | |
remaining: int | |
replyTo: IActorRef<Message> | |
partitions: Map<string,IActorRef<Message>> } | |
let rec committing (state: State) (ctx: Actor<Message>) = actor { | |
match! ctx.Receive() with | |
| Committed txn when state.txn = txn -> | |
let remaining = state.remaining - 1 | |
if remaining = 0 then | |
logInfof ctx "commit phase complete" | |
state.replyTo <! WriteAllReply | |
return Stop | |
else | |
logInfof ctx "received comitted - remaining: %i" remaining | |
return! committing { state with remaining = remaining } ctx | |
// we don't handle Abort during commit phase - we already passed point of no return | |
| _ -> return Unhandled } | |
and preparing (state: State) (ctx: Actor<Message>) = actor { | |
match! ctx.Receive() with | |
| Prepared txn when state.txn = txn -> | |
let remaining = state.remaining - 1 | |
if remaining = 0 then | |
logInfof ctx "prepare phase complete" | |
for e in state.partitions do | |
e.Value <! Commit(state.txn, e.Key) | |
return! committing { state with remaining = state.total } ctx | |
else | |
logInfof ctx "received prepared - remaining: %i" remaining | |
return! preparing { state with remaining = remaining } ctx | |
| Abort -> | |
// no need to inform partitions about aborted transaction - | |
// it will never be committed and eventually collected by the vacuum process | |
logWarning ctx "aborting PutAll" | |
state.replyTo <! Aborted | |
return Stop | |
| _ -> return Unhandled } | |
/// Read transaction coordinator. | |
module private ReadCoordinator = | |
type State = | |
{ replyTo: IActorRef<Message> | |
partitions: Map<string,IActorRef<Message>> | |
remaining: Set<string> | |
result: Map<string, Record option> } | |
/// In round 1 we return the most recent comitted entries for each key. However it may happen that they belonged | |
/// to a different concurrent transactions. In that case check if all results from round 1 have acceptable timestamps | |
/// and use bloom filters to check if by any chance they don't belong to different transactions overlapping the same | |
/// entries. | |
let rec private siblings (ret: Map<string, Record option>) : Map<string, TxnId list> = | |
let mutable deps = Map.empty | |
for e1 in ret do | |
for e2 in ret do | |
if e1.Key <> e2.Key then | |
match e1.Value, e2.Value with | |
| Some i1, Some i2 when i1.txnId > i2.txnId && Bloom.maybeContains e2.Key i1.filter -> | |
let ts = Map.tryFind e2.Key deps |> Option.defaultValue [] | |
deps <- Map.add e2.Key (i1.txnId::ts) deps | |
| _ -> () | |
deps | |
let rec round1 (state: State) (ctx: Actor<Message>) = actor { | |
match! ctx.Receive() with | |
| ReadReply(key, item) -> | |
let remaining = Set.remove key state.remaining | |
let result = Map.add key item state.result | |
if Set.isEmpty remaining then | |
let siblings = siblings result | |
if Map.isEmpty siblings then | |
logInfof ctx "received all entries in 1st round" | |
let reply = | |
result | |
|> Map.filter (fun k v -> Option.isSome v) | |
|> Map.map (fun k v -> v.Value.value) | |
state.replyTo <! ReadAllReply reply | |
return Stop | |
else | |
logInfof ctx "siblings found - starting second round for %O" (siblings |> Map.keys |> List.ofSeq) | |
for e in siblings do | |
let partition = Map.find e.Key state.partitions | |
partition <! Read(e.Key, Array.ofList e.Value) | |
return! round2 { state with remaining = siblings |> Map.keys |> Set.ofSeq } ctx | |
else | |
let state = { state with remaining = remaining; result = result } | |
return! round1 state ctx | |
| Abort -> | |
logWarning ctx "aborting GetAll" | |
state.replyTo <! Aborted | |
return Stop | |
| _ -> return Unhandled } | |
and round2 (state: State) (ctx: Actor<Message>) = actor { | |
match! ctx.Receive() with | |
| ReadReply(key, item) -> | |
let remaining = Set.remove key state.remaining | |
let result = Map.add key item state.result | |
if Set.isEmpty remaining then | |
logInfof ctx "received all entries in 2nd round" | |
let reply = | |
result | |
|> Map.filter (fun k v -> Option.isSome v) | |
|> Map.map (fun k v -> v.Value.value) | |
state.replyTo <! ReadAllReply reply | |
return Stop | |
else | |
let state = { state with remaining = remaining; result = result } | |
return! round1 state ctx | |
| Abort -> | |
logWarning ctx "aborting GetAll" | |
state.replyTo <! Aborted | |
return Stop | |
| _ -> return Unhandled | |
} | |
/// Service is user-facing actor taking requests and producing responses for read/write transactions. | |
/// You only need one of them on each node. | |
[<RequireQualifiedAccess>] | |
module Service = | |
type State = | |
{ id: int | |
seqNr: uint64 | |
partitions: IActorRef<Message>[] } | |
let private keyToPartition key node = | |
let idx = (hash key) % node.partitions.Length | |
node.partitions.[idx] | |
let rec private ready (state: State) (ctx: Actor<Message>) = actor { | |
match! ctx.Receive() with | |
| ReadAll(keys, timeout) -> | |
let sender = ctx.Sender() | |
let partitions = | |
keys | |
|> Seq.map (fun key -> key, keyToPartition key state) | |
|> Map.ofSeq | |
let coordinator = | |
let state: ReadCoordinator.State = | |
{ replyTo = sender | |
partitions = partitions | |
remaining = partitions |> Map.keys |> Set.ofSeq | |
result = Map.empty } | |
spawnAnonymous ctx (props (ReadCoordinator.round1 state)) | |
// abort on timeout - technically we should store the cancel token and dispose it on success | |
ctx.Schedule timeout ctx.Self Abort |> ignore | |
for e in partitions do | |
e.Value.Tell(Read(e.Key, [||]), untyped coordinator) // initialize first round - no deps | |
return! ready state ctx | |
| WriteAll(entries, timeout) -> | |
let sender = ctx.Sender() | |
let now = DateTime.UtcNow | |
let deadline = now + timeout | |
let txn = (now, state.id) | |
let partitions = entries |> Map.map (fun key _ -> keyToPartition key state) | |
let bloom = | |
entries | |
|> Map.keys | |
|> Seq.fold (fun acc key -> Bloom.add key acc) Bloom.empty | |
let coordinator = | |
let state: WriteCoordinator.State = | |
{ txn = txn | |
total = Map.count entries | |
remaining = Map.count entries | |
replyTo = sender | |
partitions = partitions } | |
spawnAnonymous ctx (props (WriteCoordinator.preparing state)) | |
// abort on timeout - technically we should store the cancel token and dispose it on success | |
ctx.Schedule timeout ctx.Self Abort |> ignore | |
for e in entries do | |
let partition = partitions.[e.Key] | |
let item = { value = e.Value; txnId = txn; filter = bloom } | |
partition.Tell(Prepare(txn, deadline, e.Key, item), untyped coordinator) // init prepare phase | |
return! ready state ctx | |
| _ -> return Unhandled } | |
/// Set all keys to corresponding values as a single RAMP transaction. RAMP transactions are non-interactive (unlike | |
/// eg. SQL), so there's no user-driven begin/commit step. Instead, RAMP requires that entire batch is submitted | |
/// at once. | |
let writeAll (timeout: TimeSpan) (entries: Map<string,byte[]>) svc : Async<unit> = async { | |
match! svc <? WriteAll(entries, timeout) with | |
| WriteAllReply -> return () | |
| Aborted -> return failwith "transaction timed out" | |
} | |
/// Get entries for all keys as a single RAMP read transaction. | |
let readAll (timeout: TimeSpan) (keys: string list) svc : Async<Map<string,byte[]>> = async { | |
match! svc <? ReadAll(Set.ofList keys, timeout) with | |
| ReadAllReply r -> return r | |
| Aborted -> return failwith "transaction timed out" | |
} | |
/// Props for creating service actor. | |
let props (partitions: IActorRef<Message>[]) (id: int) : Props<Message> = | |
let state = | |
{ id = id | |
seqNr = 0UL | |
partitions = partitions } | |
props (ready state) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
module Demo.Tests.RampTests | |
open System | |
open Akkling | |
open Expecto | |
open Demo.Ramp | |
let [<Literal>] PARTITION_COUNT = 5 | |
let [<Literal>] SERVICE_COUNT = 5 | |
[<Tests>] | |
let tests = testList "RAMP Hybrid" [ | |
ftestAsync "maintains transaction guarantees" { | |
let timeout = TimeSpan.FromSeconds 30. | |
use sys = System.create "sys" (Configuration.defaultConfig()) | |
let partitions = Array.init PARTITION_COUNT <| fun i -> retype (spawn sys $"partition-%i{i}" (Partition.props ())) | |
let services = Array.init SERVICE_COUNT <| fun i -> spawn sys $"service-%i{i}" (Service.props partitions i) | |
// step 1 - initialize entries: X=1, Y=2 | |
let entries = Map.ofList [ | |
"X", [|1uy|] | |
"Y", [|2uy|] | |
] | |
do! Service.writeAll timeout entries services.[0] | |
// step 2 - concurrent get/put: X=3, Y=4 | |
let entries = Map.ofList [ | |
"X", [|3uy|] | |
"Y", [|4uy|] | |
] | |
let t1 = async { | |
do! Service.writeAll timeout entries services.[1] | |
return Map.empty | |
} | |
let t2 = Service.readAll timeout ["X";"Y"] services.[2] | |
let! [|b;a|] = Async.Parallel [t2; t1] | |
// assert - we received either (X=1,Y=2) or (X=3,Y=4), but never (X=3,Y=2) | |
let v1 = Map.ofList [ "X", [|1uy|]; "Y", [|2uy|] ] | |
let v2 = Map.ofList [ "X", [|3uy|]; "Y", [|4uy|] ] | |
printfn "actual: %A" b | |
Expect.isTrue (b = v1 || b = v2) "" | |
} | |
] |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment