Last active
May 17, 2024 15:12
-
-
Save Horusiath/fd2ad1527c3cdc42bb9afcb121c85233 to your computer and use it in GitHub Desktop.
Hash rings implementations in F#
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
module Demo.HashRings | |
open System | |
open System.Collections.Generic | |
/// Range is a tuple describing (s,e] - where `s` is start | |
/// (exclusive) index, while `e` is end (inclusive) index. | |
type Range = ValueTuple<int,int> | |
[<RequireQualifiedAccess>] | |
module Range = | |
/// Returns a range that covers entire keyspace. | |
let full: Range = struct(0,0) | |
/// Checks if current range is full, meaning it covers entire keyspace. | |
let inline isFull (struct(s,e): Range) = s = e | |
/// Checks if given range is inner (the space it covers doesn't go over the keyspace overlap). | |
/// Example: (1,5] is considered inner range, while (5000,1] is outer range as the area it covers | |
/// goes over 2^32. | |
let inline isInner (struct(s,e): Range) = s < e | |
/// Checks if given value is contained within current range. | |
let contains value (struct(s,e): Range) = | |
if s < e then value > s && value <= e else value > s || value <= e | |
/// Checks if two ranges overlap. Ranges will overlap if they boundaries are at least next to each other. | |
let overlap (struct(s1,e1): Range) (struct(s2,e2): Range) = | |
// add +1 because start (left side) is exclusive | |
contains (s2 + 1) (struct(s1,e1)) || contains (s1 + 1) (struct(s2,e2)) | |
let private mergeSame (s1: int) (e1: int) (s2: int) (e2: int) : Range option = | |
let s = Math.Min(s1, s2) | |
let e = Math.Max(e1, e2) | |
if s = s1 || e = e1 || (s = s2 && e = e2) then | |
// either (s2,e2) is within (s1,e2) or they are overlapping or vice versa | |
Some (struct(s, e)) | |
else None | |
let private mergeWithInner (s1: int) (e1: int) (s2: int) (e2: int) : Range option = | |
if s1 > e2 && e1 < s2 then None | |
else | |
let s, e = | |
if e2 >= s1 then (Math.Min(s1, s2), Math.Min(e1, e2)) // join on the right end: (5,0] & (1,5] => (1,0] | |
else (Math.Max(s1, s2), Math.Max(e1, e2)) // join on the left end: (5,0] & (0,3] => (5,3] | |
if s = e then Some full else Some (struct(s, e)) | |
/// Attempts to merge two ranges together, returning new range or None if merged ranges were not overlapping. | |
let tryMerge (struct(s1,e1): Range) (struct(s2,e2): Range) = | |
if s1 = e1 || s2 = e2 || (s1 = e2 && s2 = e1) then Some full | |
elif s1 < e1 && s2 < e2 then mergeSame s1 e1 s2 e2 // both ranges are inner | |
elif s2 < e2 then mergeWithInner s1 e1 s2 e2 // (s1,e1) is outer range | |
elif s1 < e1 then mergeWithInner s2 e2 s1 e1 // (s2,e2) is outer range | |
else mergeSame s1 e1 s2 e2 | |
/// Attempts to merge two ranges together, failing with exception if merged ranges were not overlapping. | |
let merge (a: Range) (b: Range) = | |
match tryMerge a b with | |
| Some c -> c | |
| None -> | |
let struct(s1, e1) = a | |
let struct(s2, e2) = b | |
failwithf "Ranges (%i,%i] and (%i,%i] are not adjacent" s1 e1 s2 e2 | |
/// RingRange describes either continuous or fragmented keyspace that a partition may be responsible for. | |
[<Struct>] | |
type RingRange = | |
| Continuous of continuous:Range | |
| Fragmented of fragments:Range[] | |
[<RequireQualifiedAccess>] | |
module RingRange = | |
/// Returns a `RingRange` that covers entire available keyspace. | |
let full: RingRange = Continuous Range.full | |
/// Checks if given `value` can be found within current ring range. | |
let contains value (r: RingRange) = | |
match r with | |
| Continuous r -> Range.contains value r | |
| Fragmented rs -> rs |> Array.exists (Range.contains value) | |
let isFull r = | |
match r with | |
| Continuous r -> Range.isFull r | |
| Fragmented [||] -> false | |
| Fragmented rs -> | |
let struct(s1,e1) = rs.[0] | |
let mutable s1 = s1 | |
let mutable e1 = e1 | |
let mutable i = 1 | |
let mutable result = true | |
while result && i < rs.Length do | |
let struct(s2,e2) = rs.[i] | |
result <- e1 = s2 | |
i <- i + 1 | |
s1 <- s2 | |
e1 <- e2 | |
let struct(s2,_) = rs.[0] | |
e1 = s2 | |
/// Compacts range fragments together, merging them together if possible. | |
/// It assumes that fragments are sorted in order of the start position of the range. | |
let private compact (fragments: Range[]) = | |
let result = ResizeArray() | |
let mutable latest = fragments.[0] | |
for i = 1 to fragments.Length - 1 do | |
let next = fragments.[i] | |
if Range.overlap latest next then | |
latest <- Range.merge latest next | |
else | |
result.Add latest | |
latest <- next | |
if result.Count = 1 && Range.isFull result.[0] | |
then Continuous Range.full | |
else Fragmented (result.ToArray()) | |
let create ranges = | |
match ranges with | |
| [||] -> Fragmented [||] | |
| [|r|] -> Continuous r | |
| rs -> | |
let mutable hasOverlaps = false | |
let mutable latest = rs.[0] | |
let mutable i = 1 | |
while not hasOverlaps && i < rs.Length do | |
let next = rs.[i] | |
hasOverlaps <- Range.overlap latest next | |
latest <- next | |
i <- i + 1 | |
if hasOverlaps then compact rs | |
else Fragmented rs | |
/// Checks if two ring ranges have overlapping parts. | |
let overlap a b = | |
match a, b with | |
| Continuous r1, Continuous r2 -> Range.overlap r1 r2 | |
| Continuous r, Fragmented rs | |
| Fragmented rs, Continuous r -> rs |> Array.exists (Range.overlap r) | |
| Fragmented rs1, Fragmented rs2 -> | |
rs1 |> Array.exists (fun r -> | |
rs2 |> Array.exists (Range.overlap r)) | |
/// Merges two ring ranges together. | |
let merge a b = | |
match a, b with | |
| Continuous r1, Continuous r2 -> | |
match Range.tryMerge r1 r2 with | |
| Some r -> Continuous r | |
| None -> Fragmented (Array.sort [| r1; r2 |]) | |
| Continuous r, Fragmented rs | |
| Fragmented rs, Continuous r -> | |
let struct(start, _) = r | |
let idx = | |
rs | |
|> Array.tryFindIndexBack (fun (struct(x,_)) -> x < start) | |
|> Option.defaultValue 0 | |
let rs = ResizeArray(rs) | |
rs.Insert(idx, r) | |
create (rs.ToArray()) | |
| Fragmented rs1, Fragmented rs2 -> | |
let rs = Array.append rs1 rs2 | |
let rs = rs |> Array.sortBy (fun (struct(a,_)) -> a) | |
create rs | |
/// Consistent hash ring splits an available key space into chunks of continuous fragments. | |
/// | |
/// Whenever an new partition is added, it will be assigned a fragment of space, that was managed | |
/// by it's left neighbor. | |
/// | |
/// Whenever a partition is removed, it will give back its range to its left neighbor. | |
type ConsistentHashRing = | |
{ range: RingRange | |
hash: int | |
myself: string | |
partitions: (struct(int* string))[] } | |
[<RequireQualifiedAccess>] | |
module ConsistentHashRing = | |
/// Computes a consistent hash of a given string. | |
let inline hash (partition: string) = Akka.Util.MurmurHash.StringHash(partition) | |
/// Creates a new instance of `ConsistentHashRing` bound to a perspective of a provided partition. | |
let create (myself: string) = | |
let h = hash myself | |
{ range = RingRange.full | |
hash = h | |
myself = myself | |
partitions = [| struct(h, myself) |] } | |
/// Returns a partition, which is responsible for a given consistent `hash`. | |
let tryFindByHash hash ring = | |
let first = Array.tryHead ring.partitions |> Option.map (fun struct(_,first) -> first) | |
ring.partitions | |
|> Array.tryPick (fun struct(h, p) -> if h >= hash then Some p else None) | |
|> Option.orElse first // wrap around the int overflow | |
/// Adds a `partition` to a `ring`. Returns tuple which contains an updated ring including that partition | |
/// together with an option, that signalizes if ring range managed by `ring` has been changed or not due | |
/// to a partition insertion. | |
let add partition ring = | |
let hash = hash partition | |
if ring.partitions |> Array.exists (fun struct(h,_) -> h = hash) then ring | |
else | |
let selfIdx = ring.partitions |> Array.findIndex (fun struct(_,v) -> v = ring.myself) | |
let idx = 1 + (ring.partitions |> Array.tryFindIndexBack (fun struct(h, _) -> h < hash) |> Option.defaultValue -1) | |
let partitions = Array.insert idx (struct(hash, partition)) ring.partitions | |
let ring = { ring with partitions = partitions } | |
if selfIdx >= 0 then | |
if selfIdx = idx || selfIdx = 0 && idx = partitions.Length - 1 then | |
// new node took over some of my responsibility | |
{ ring with range = Continuous struct(hash, ring.hash) } | |
else ring | |
else ring | |
/// Removes a `partition` from a `ring`. Returns tuple which contains an updated ring without that partition | |
/// together with an option, that signalizes if ring range managed by `ring` has been changed or not due to | |
/// partition removal. | |
let remove partition ring = | |
let removeIdx = ring.partitions |> Array.findIndex (fun struct(_,p) -> p = partition) | |
if removeIdx >= 0 then | |
let partitions = Array.removeAt removeIdx ring.partitions | |
let ring = { ring with partitions = partitions } | |
let selfIdx = ring.partitions |> Array.findIndex (fun struct(_,p) -> p = ring.myself) | |
let wasPredecessor = removeIdx = selfIdx || (selfIdx = 0 && removeIdx = partitions.Length) | |
if wasPredecessor then | |
let range = | |
if partitions.Length = 1 then RingRange.full | |
else | |
let predecessorIdx = Math.Min(Math.Max(0, selfIdx - 1), partitions.Length - 1) | |
let struct(predecessorHash, _) = partitions.[predecessorIdx] | |
Continuous struct(predecessorHash, ring.hash) | |
{ ring with range = range } | |
else ring | |
else ring | |
type VirtualBucketRing = | |
{ hash: int | |
myself: string | |
range: RingRange | |
bucketsPerPartition: int | |
buckets: Map<int,string> } | |
[<RequireQualifiedAccess>] | |
module VirtualBucketRing = | |
/// Computes a consistent hash of a given string. | |
let inline hash (partition: string) = Akka.Util.MurmurHash.StringHash(partition) | |
let tryFindByHash hash ring = | |
let found = ring.buckets |> Seq.tryPick (fun e -> if e.Key >= hash then Some e.Value else None) | |
found |> Option.orElse (ring.buckets |> Seq.tryHead |> Option.map (fun e -> e.Value)) | |
let private uniformHashes value count : int[] = | |
let hashes = Array.zeroCreate count | |
for i=0 to Array.length hashes - 1 do | |
hashes.[i] <- hash (value + string i) | |
hashes | |
let private update partition (buckets: Map<int,string>) = | |
let mutable e = (buckets :> IEnumerable<_>).GetEnumerator() | |
let ranges = ResizeArray() | |
if e.MoveNext() then | |
let first = e.Current | |
let mutable current = first | |
while e.MoveNext() do | |
let next = e.Current | |
if next.Value = partition then | |
ranges.Add struct(current.Key, next.Key) | |
current <- next | |
if first.Value = partition then // keyspace overlap | |
ranges.Add struct(current.Key, first.Key) | |
RingRange.create (ranges.ToArray()) | |
else RingRange.full | |
let add partition ring = | |
if Map.exists (fun k v -> v = partition) ring.buckets then ring | |
else | |
let hashes = uniformHashes partition ring.bucketsPerPartition | |
let buckets = hashes |> Array.fold (fun buckets hash -> | |
match Map.tryFind hash buckets with | |
| Some other when partition > other -> buckets | |
| _ -> Map.add hash partition buckets) ring.buckets | |
let range = update ring.myself buckets | |
{ ring with range = range; buckets = buckets } | |
let remove partition ring = | |
if Map.exists (fun k v -> v = partition) ring.buckets then | |
let hashes = uniformHashes partition ring.bucketsPerPartition | |
let buckets = | |
hashes | |
|> Array.fold (fun buckets hash -> | |
match Map.tryFind hash buckets with | |
| Some p when p = partition -> Map.remove hash buckets | |
| _ -> buckets) ring.buckets | |
let range = update ring.myself buckets | |
{ ring with range = range; buckets = buckets } | |
else ring | |
let create bucketsPerPartition myself = | |
let h = hash myself | |
let ring = | |
{ hash = h | |
myself = myself | |
range = RingRange.full | |
buckets = Map.empty | |
bucketsPerPartition = bucketsPerPartition } | |
{ add myself ring with range = RingRange.full } | |
module Tests = | |
open Expecto | |
[<Tests>] | |
let rangeTests = testList "Dht" [ | |
test "Range full check" { | |
Expect.isTrue (Range.isFull Range.full) "full range" | |
Expect.isFalse (Range.isFull struct(0, 1)) "(0,1] should not be full" | |
Expect.isFalse (Range.isFull struct(1, 0)) "(1,0] should not be full" | |
Expect.isFalse (Range.isFull struct(2, 5)) "(2,5] should not be full" | |
Expect.isFalse (Range.isFull struct(5, 2)) "(5,3] should not be full" | |
} | |
test "Range contains" { | |
let r = struct(1, 5) | |
Expect.isFalse (Range.contains 0 r) "(1,5] should not contain 0" | |
Expect.isFalse (Range.contains 1 r) "(1,5] should not contain 1" | |
Expect.isTrue (Range.contains 2 r) "(1,5] should contain 2" | |
Expect.isTrue (Range.contains 4 r) "(1,5] should contain 4" | |
Expect.isTrue (Range.contains 5 r) "(1,5] should contain 5" | |
Expect.isFalse (Range.contains 6 r) "(1,5] should not contain 6" | |
} | |
test "Range contains outer" { | |
let r = struct(5,1) | |
Expect.isFalse (Range.contains 4 r) "(5,1] should not contain 4" | |
Expect.isFalse (Range.contains 5 r) "(5,1] should not contain 5" | |
Expect.isTrue (Range.contains 6 r) "(5,1] should contain 6" | |
Expect.isTrue (Range.contains 1000 r) "(5,1] should contain 1000" | |
Expect.isTrue (Range.contains 0 r) "(5,1] should contain 0" | |
Expect.isTrue (Range.contains 1 r) "(5,1] should contain 1" | |
Expect.isFalse (Range.contains 2 r) "(5,1] should not contain 2" | |
} | |
test "Range overlaps" { | |
let r = struct(1,5) | |
Expect.isFalse (Range.overlap r struct(5,1)) "Ranges (1,5] and (5,1] should not overlap" | |
Expect.isFalse (Range.overlap r struct(0,1)) "Ranges (1,5] and (0,1] should not overlap" | |
Expect.isFalse (Range.overlap r struct(5,100)) "Ranges (1,5] and (5,100] should not overlap" | |
Expect.isTrue (Range.overlap r struct(0,2)) "Ranges (1,5] and (0,2] should overlap" | |
Expect.isTrue (Range.overlap r struct(2,4)) "Ranges (1,5] and (2,4] should overlap" | |
Expect.isTrue (Range.overlap r struct(4,1)) "Ranges (1,5] and (4,1] should overlap" | |
Expect.isTrue (Range.overlap r struct(1,6)) "Ranges (1,5] and (1,6] should overlap" | |
} | |
test "Range merge" { | |
let r = struct(1,5) | |
Expect.equal Range.full (Range.merge r struct(5,1)) "" | |
Expect.equal struct(0,6) (Range.merge r struct(0,6)) "" | |
Expect.equal struct(1,6) (Range.merge r struct(1,6)) "" | |
Expect.equal struct(1,6) (Range.merge r struct(2,6)) "" | |
Expect.equal struct(1,6) (Range.merge r struct(5,6)) "" | |
Expect.equal struct(1,0) (Range.merge r struct(5,0)) "" | |
let r = struct(5,1) | |
Expect.equal Range.full (Range.merge r struct(1,5)) "" | |
Expect.equal Range.full (Range.merge r struct(1,6)) "" | |
Expect.equal struct(5,1) (Range.merge r struct(7,0)) "" | |
Expect.equal struct(4,1) (Range.merge r struct(4,0)) "" | |
Expect.equal struct(5,4) (Range.merge r struct(1,4)) "" | |
} | |
test "Consistent hash ring" { | |
let a = ConsistentHashRing.create "A" | |
let b = ConsistentHashRing.create "B" | |
Expect.isTrue (RingRange.isFull a.range) "Ring A should start full" | |
Expect.isTrue (RingRange.isFull b.range) "Ring B should start full" | |
// connect nodes | |
let a = ConsistentHashRing.add "B" a | |
let b = ConsistentHashRing.add "A" b | |
Expect.isFalse (RingRange.overlap a.range b.range) "Ranges of responsibilities for A and B should not overlap" | |
Expect.isTrue (RingRange.isFull (RingRange.merge a.range b.range)) "Ranges of A and B should cover full keyspace" | |
// ensure that we got consistent responses, no mather which side we ask | |
for value in ["hello"; "world"] do | |
let hash = ConsistentHashRing.hash value | |
let ka = ConsistentHashRing.tryFindByHash hash a | |
let kb = ConsistentHashRing.tryFindByHash hash b | |
Expect.equal ka kb (sprintf "Both rings A and B placed '%s' on the same partition" value) | |
} | |
test "Virtual bucket ring" { | |
let a = VirtualBucketRing.create 10 "A" | |
let b = VirtualBucketRing.create 10 "B" | |
Expect.isTrue (RingRange.isFull a.range) "Ring A should start full" | |
Expect.isTrue (RingRange.isFull b.range) "Ring B should start full" | |
// connect nodes | |
let a = VirtualBucketRing.add "B" a | |
let b = VirtualBucketRing.add "A" b | |
Expect.isFalse (RingRange.overlap a.range b.range) "Ranges of responsibilities for A and B should not overlap" | |
Expect.isTrue (RingRange.isFull (RingRange.merge a.range b.range)) "Ranges of A and B should cover full keyspace" | |
// ensure that we got consistent responses, no mather which side we ask | |
for value in ["hello"; "world"] do | |
let hash = VirtualBucketRing.hash value | |
let ka = VirtualBucketRing.tryFindByHash hash a | |
let kb = VirtualBucketRing.tryFindByHash hash b | |
Expect.equal ka kb (sprintf "Both rings A and B placed '%s' on the same partition" value) | |
} | |
] |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment