Skip to content

Instantly share code, notes, and snippets.

@Horusiath
Last active Aug 21, 2021
Embed
What would you like to do?
RAMP Hybrid transaction protocol implementation using Akka.NET in F#
module Demo.Ramp
open System
open Akka.Actor
open Akkling
/// A Lamport clock timestamp used as transaction identifier - sequence number with unique node identifier.
/// Original paper implementation uses combination of hybrid logical clock with node id encoded together into uint64.
type TxnId = DateTime * int
type Bloom = System.Collections.BitArray
[<RequireQualifiedAccess>]
module Map =
let keys (map: Map<'k,'v>) = map |> Seq.map (fun e -> e.Key)
let private hash (key: string): int =
let h = Akka.Util.MurmurHash.StringHash key
Math.Abs(h)
/// Simplistic implementation of an immutable Bloom Filter. It does deep copy of underlying bit set on every insert.
[<RequireQualifiedAccess>]
module Bloom =
/// Bloom filter size (in bits).
let [<Literal>] private SIZE = 64
/// Number of hashes used per key.
let [<Literal>] private HASH_COUNT = 3
let empty = System.Collections.BitArray(SIZE, false)
let add (key: string) (filter: Bloom) : Bloom =
let copy = filter.Clone() :?> Bloom
for i in 0..HASH_COUNT do
copy.[(hash (key + string i)) % SIZE] <- true
copy
let maybeContains (key: string) (filter: Bloom) : bool =
let rec loop (i: int) (key: string) (filter: Bloom) =
if i = HASH_COUNT then true
elif not filter.[(hash (key + string i)) % SIZE] then false
else loop (i+1) key filter
loop 0 key filter
/// Value bucket used for RAMP Hybrid algorithm:
/// - RAMP Fast would use list of transaction keys instead of Bloom Filter: this gives 1RTT Gets in every case at cost of
/// more heavy data items (metadata has variable size dependent of number of keys within transaction).
/// - RAMP Small wouldn't need to use any extra metadata at cost of constant 2 roundtrips on get request.
/// - RAMP Hybrid uses Bloom filters for metadata - if happy case it means 1RTT. In some cases when bloom filter cannot
/// give straight answer, it means 2RTT for Get request. In either way bloom filter metadata has constant size.
[<Struct>]
type Record =
{ value: byte[] // value stored - we use Last Write Wins semantics
txnId: TxnId // a transaction timestamp for this data item
filter: Bloom } // RAMP Hybrid bloom filter
override this.ToString() =
let bf =
let sb = System.Text.StringBuilder()
for i in this.filter do
sb.Append(if i then '1' else '0') |> ignore
sb.ToString()
sprintf "(value: %A, txn: %O, filter: %O)" this.value this.txnId bf
type Message =
| ReadAll of keys:Set<string> * timeout:TimeSpan // user request for RAMP-Hybrid read transaction
| ReadAllReply of entries:Map<string,byte[]> // reply to `GetAll`
| WriteAll of entries:Map<string,byte[]> * timeout:TimeSpan // user request for RAMP-Hybrid write transaction
| WriteAllReply // reply to `PutAll`
| Read of key:string * deps:TxnId[] // partition entry request - transaction dependencies attached
| ReadReply of key:string * Record option // reply to `Get`
| Prepare of TxnId * deadline:DateTime * key:string * Record // prepare phase msg for a single entry from `PutAll` request
| Prepared of TxnId // reply to `Prepare`
| Commit of TxnId * key:string // commit phase msg for a single entry after `Prepared` was received
| Committed of TxnId // reply to `Commit`
| VacuumTick // tick used for vacuum cleaning
| Abort // abort transaction
| Aborted // reply to service on `Abort` request
/// Partition manages entries some fragment of key-space. These entries are multi-versioned (MVCC).
/// Partitions can be dispatched on multiple nodes. In normal situation to manage their placement, one could use
/// distributed hash table (DHT) and use handover protocol as new nodes come and go, but it's not the purpose of this
/// demo.
module Partition =
type private State =
{ versions: Map<string, Map<TxnId, Record>> // MVCC dictionary - entries are versioned by timestamp
lastCommit: Map<string, TxnId> // the latest committed transaction for each entry
deadlines: Map<TxnId, DateTime> // completion deadlines given each active transaction
vacuum: ICancelable option } // MVCC vacuum cleaner scheduled task
let rec private ready (state: State) (ctx: Actor<obj>) = actor {
let! msg = ctx.Receive()
let sender = ctx.Sender()
match msg with
| :? Message as msg ->
match msg with
| Read(key, deps) ->
let reply =
if Array.isEmpty deps then
// dependencies are empty on 1st round of Get request gathering - just return the latest entries
match Map.tryFind key state.lastCommit, Map.tryFind key state.versions with
| Some ts, Some versions -> Map.tryFind ts versions
| _, _ -> None
else
// on 2nd round of Get request return specific entries requested
let versions = Map.tryFind key state.versions |> Option.defaultValue Map.empty
let recentTimestamps = Map.keys versions |> Seq.sortDescending
let found = recentTimestamps |> Seq.tryFind (fun version -> Array.contains version deps)
found |> Option.map (fun version -> Map.find version versions)
logInfof ctx "received get for '%s' with %i deps" key deps.Length
sender <! ReadReply(key, reply)
return! ready state ctx
| Prepare(txn, deadline, key, value) ->
logInfof ctx "received prepare for '%s'=%O" key value
let versions = Map.tryFind key state.versions |> Option.defaultValue Map.empty
let versions = Map.add txn value versions
let deadlines = Map.add txn deadline state.deadlines
let state = { state with versions = Map.add key versions state.versions; deadlines = deadlines }
sender <! Prepared txn
return! ready state ctx
| Commit(txn, key) ->
logInfof ctx "received commit for '%s' with txn id: %O" key txn
let old = Map.tryFind key state.lastCommit |> Option.defaultValue (DateTime.MinValue, 0)
let state =
if old >= txn then state // use Last Write Wins semantics
else { state with lastCommit = Map.add key txn state.lastCommit }
sender <! Committed txn
return! ready state ctx
| VacuumTick ->
// drop all non-latest versions older than 2 minutes - this means that each transaction has hard 2min execution limit
let mutable count = 0
let deadline = DateTime.UtcNow + TimeSpan.FromMinutes 1. // add 1min grace period for clock drifts
// get list of overdue transactions
let overdue =
state.deadlines
|> Seq.choose (fun e -> if e.Value >= deadline then Some e.Key else None)
|> Set.ofSeq
let versions =
state.versions
|> Map.map (fun key versions ->
versions
|> Map.filter (fun txn item ->
match Map.tryFind key state.lastCommit with
| Some ts when ts = txn -> true // this is committed entry
| None -> true // we cannot guarantee that transaction finished - it may have passed point of no return
| _ ->
if Set.contains item.txnId overdue then true
else // drop transactions over deadline
count <- count + 1
false
)
)
if count > 0 then
logInfof ctx "vacuum removed %i outdated entries" count
// reschedule vacuum - we do not use repeated scheduler as we don't know how long will it take
// for the vacuum to complete - vacuum can work incrementally eg. scan only limited no. of entries on
// each tick
let vacuum = ctx.Schedule (TimeSpan.FromMinutes 1.) (retype ctx.Self) VacuumTick
let deadlines = overdue |> Set.fold (fun acc txn -> Map.remove txn acc) state.deadlines
return! ready { state with versions = versions; deadlines = deadlines; vacuum = Some vacuum } ctx
| _ -> return Unhandled
| LifecycleEvent(PreStart) ->
let vacuum = ctx.Schedule (TimeSpan.FromMinutes 1.) (retype ctx.Self) VacuumTick
return! ready { state with vacuum = Some vacuum } ctx
| LifecycleEvent(PostStop) ->
state.vacuum |> Option.iter(fun c -> c.Cancel())
return Ignore
| _ -> return Ignore
}
let props () =
let state = { versions = Map.empty; lastCommit = Map.empty; deadlines = Map.empty; vacuum = None }
props (ready state)
/// Write transaction coordinator. Uses 2-phase commit to update all entries atomically.
module private WriteCoordinator =
type State =
{ txn: TxnId
total: int
remaining: int
replyTo: IActorRef<Message>
partitions: Map<string,IActorRef<Message>> }
let rec committing (state: State) (ctx: Actor<Message>) = actor {
match! ctx.Receive() with
| Committed txn when state.txn = txn ->
let remaining = state.remaining - 1
if remaining = 0 then
logInfof ctx "commit phase complete"
state.replyTo <! WriteAllReply
return Stop
else
logInfof ctx "received comitted - remaining: %i" remaining
return! committing { state with remaining = remaining } ctx
// we don't handle Abort during commit phase - we already passed point of no return
| _ -> return Unhandled }
and preparing (state: State) (ctx: Actor<Message>) = actor {
match! ctx.Receive() with
| Prepared txn when state.txn = txn ->
let remaining = state.remaining - 1
if remaining = 0 then
logInfof ctx "prepare phase complete"
for e in state.partitions do
e.Value <! Commit(state.txn, e.Key)
return! committing { state with remaining = state.total } ctx
else
logInfof ctx "received prepared - remaining: %i" remaining
return! preparing { state with remaining = remaining } ctx
| Abort ->
// no need to inform partitions about aborted transaction -
// it will never be committed and eventually collected by the vacuum process
logWarning ctx "aborting PutAll"
state.replyTo <! Aborted
return Stop
| _ -> return Unhandled }
/// Read transaction coordinator.
module private ReadCoordinator =
type State =
{ replyTo: IActorRef<Message>
partitions: Map<string,IActorRef<Message>>
remaining: Set<string>
result: Map<string, Record option> }
/// In round 1 we return the most recent comitted entries for each key. However it may happen that they belonged
/// to a different concurrent transactions. In that case check if all results from round 1 have acceptable timestamps
/// and use bloom filters to check if by any chance they don't belong to different transactions overlapping the same
/// entries.
let rec private siblings (ret: Map<string, Record option>) : Map<string, TxnId list> =
let mutable deps = Map.empty
for e1 in ret do
for e2 in ret do
if e1.Key <> e2.Key then
match e1.Value, e2.Value with
| Some i1, Some i2 when i1.txnId > i2.txnId && Bloom.maybeContains e2.Key i1.filter ->
let ts = Map.tryFind e2.Key deps |> Option.defaultValue []
deps <- Map.add e2.Key (i1.txnId::ts) deps
| _ -> ()
deps
let rec round1 (state: State) (ctx: Actor<Message>) = actor {
match! ctx.Receive() with
| ReadReply(key, item) ->
let remaining = Set.remove key state.remaining
let result = Map.add key item state.result
if Set.isEmpty remaining then
let siblings = siblings result
if Map.isEmpty siblings then
logInfof ctx "received all entries in 1st round"
let reply =
result
|> Map.filter (fun k v -> Option.isSome v)
|> Map.map (fun k v -> v.Value.value)
state.replyTo <! ReadAllReply reply
return Stop
else
logInfof ctx "siblings found - starting second round for %O" (siblings |> Map.keys |> List.ofSeq)
for e in siblings do
let partition = Map.find e.Key state.partitions
partition <! Read(e.Key, Array.ofList e.Value)
return! round2 { state with remaining = siblings |> Map.keys |> Set.ofSeq } ctx
else
let state = { state with remaining = remaining; result = result }
return! round1 state ctx
| Abort ->
logWarning ctx "aborting GetAll"
state.replyTo <! Aborted
return Stop
| _ -> return Unhandled }
and round2 (state: State) (ctx: Actor<Message>) = actor {
match! ctx.Receive() with
| ReadReply(key, item) ->
let remaining = Set.remove key state.remaining
let result = Map.add key item state.result
if Set.isEmpty remaining then
logInfof ctx "received all entries in 2nd round"
let reply =
result
|> Map.filter (fun k v -> Option.isSome v)
|> Map.map (fun k v -> v.Value.value)
state.replyTo <! ReadAllReply reply
return Stop
else
let state = { state with remaining = remaining; result = result }
return! round1 state ctx
| Abort ->
logWarning ctx "aborting GetAll"
state.replyTo <! Aborted
return Stop
| _ -> return Unhandled
}
/// Service is user-facing actor taking requests and producing responses for read/write transactions.
/// You only need one of them on each node.
[<RequireQualifiedAccess>]
module Service =
type State =
{ id: int
seqNr: uint64
partitions: IActorRef<Message>[] }
let private keyToPartition key node =
let idx = (hash key) % node.partitions.Length
node.partitions.[idx]
let rec private ready (state: State) (ctx: Actor<Message>) = actor {
match! ctx.Receive() with
| ReadAll(keys, timeout) ->
let sender = ctx.Sender()
let partitions =
keys
|> Seq.map (fun key -> key, keyToPartition key state)
|> Map.ofSeq
let coordinator =
let state: ReadCoordinator.State =
{ replyTo = sender
partitions = partitions
remaining = partitions |> Map.keys |> Set.ofSeq
result = Map.empty }
spawnAnonymous ctx (props (ReadCoordinator.round1 state))
// abort on timeout - technically we should store the cancel token and dispose it on success
ctx.Schedule timeout ctx.Self Abort |> ignore
for e in partitions do
e.Value.Tell(Read(e.Key, [||]), untyped coordinator) // initialize first round - no deps
return! ready state ctx
| WriteAll(entries, timeout) ->
let sender = ctx.Sender()
let now = DateTime.UtcNow
let deadline = now + timeout
let txn = (now, state.id)
let partitions = entries |> Map.map (fun key _ -> keyToPartition key state)
let bloom =
entries
|> Map.keys
|> Seq.fold (fun acc key -> Bloom.add key acc) Bloom.empty
let coordinator =
let state: WriteCoordinator.State =
{ txn = txn
total = Map.count entries
remaining = Map.count entries
replyTo = sender
partitions = partitions }
spawnAnonymous ctx (props (WriteCoordinator.preparing state))
// abort on timeout - technically we should store the cancel token and dispose it on success
ctx.Schedule timeout ctx.Self Abort |> ignore
for e in entries do
let partition = partitions.[e.Key]
let item = { value = e.Value; txnId = txn; filter = bloom }
partition.Tell(Prepare(txn, deadline, e.Key, item), untyped coordinator) // init prepare phase
return! ready state ctx
| _ -> return Unhandled }
/// Set all keys to corresponding values as a single RAMP transaction. RAMP transactions are non-interactive (unlike
/// eg. SQL), so there's no user-driven begin/commit step. Instead, RAMP requires that entire batch is submitted
/// at once.
let writeAll (timeout: TimeSpan) (entries: Map<string,byte[]>) svc : Async<unit> = async {
match! svc <? WriteAll(entries, timeout) with
| WriteAllReply -> return ()
| Aborted -> return failwith "transaction timed out"
}
/// Get entries for all keys as a single RAMP read transaction.
let readAll (timeout: TimeSpan) (keys: string list) svc : Async<Map<string,byte[]>> = async {
match! svc <? ReadAll(Set.ofList keys, timeout) with
| ReadAllReply r -> return r
| Aborted -> return failwith "transaction timed out"
}
/// Props for creating service actor.
let props (partitions: IActorRef<Message>[]) (id: int) : Props<Message> =
let state =
{ id = id
seqNr = 0UL
partitions = partitions }
props (ready state)
module Demo.Tests.RampTests
open System
open Akkling
open Expecto
open Demo.Ramp
let [<Literal>] PARTITION_COUNT = 5
let [<Literal>] SERVICE_COUNT = 5
[<Tests>]
let tests = testList "RAMP Hybrid" [
ftestAsync "maintains transaction guarantees" {
let timeout = TimeSpan.FromSeconds 30.
use sys = System.create "sys" (Configuration.defaultConfig())
let partitions = Array.init PARTITION_COUNT <| fun i -> retype (spawn sys $"partition-%i{i}" (Partition.props ()))
let services = Array.init SERVICE_COUNT <| fun i -> spawn sys $"service-%i{i}" (Service.props partitions i)
// step 1 - initialize entries: X=1, Y=2
let entries = Map.ofList [
"X", [|1uy|]
"Y", [|2uy|]
]
do! Service.writeAll timeout entries services.[0]
// step 2 - concurrent get/put: X=3, Y=4
let entries = Map.ofList [
"X", [|3uy|]
"Y", [|4uy|]
]
let t1 = async {
do! Service.writeAll timeout entries services.[1]
return Map.empty
}
let t2 = Service.readAll timeout ["X";"Y"] services.[2]
let! [|b;a|] = Async.Parallel [t2; t1]
// assert - we received either (X=1,Y=2) or (X=3,Y=4), but never (X=3,Y=2)
let v1 = Map.ofList [ "X", [|1uy|]; "Y", [|2uy|] ]
let v2 = Map.ofList [ "X", [|3uy|]; "Y", [|4uy|] ]
printfn "actual: %A" b
Expect.isTrue (b = v1 || b = v2) ""
}
]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment