Skip to content

Instantly share code, notes, and snippets.

@Horusiath
Last active May 11, 2022 04:01
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Horusiath/0ae784326ef12b9cd075f3d59288ca19 to your computer and use it in GitHub Desktop.
Save Horusiath/0ae784326ef12b9cd075f3d59288ca19 to your computer and use it in GitHub Desktop.
Consistent hash ring and virtual bucket hash ring implementations
module Demo.Dht
open System
open System.Collections.Generic
/// Range is a tuple describing (s,e] - where `s` is start
/// (exclusive) index, while `e` is end (inclusive) index.
type Range = ValueTuple<int,int>
[<RequireQualifiedAccess>]
module Range =
/// Returns a range that covers entire keyspace.
let full: Range = struct(0,0)
/// Checks if current range is full, meaning it covers entire keyspace.
let inline isFull (struct(s,e): Range) = s = e
/// Checks if given range is inner (the space it covers doesn't go over the keyspace overlap).
/// Example: (1,5] is considered inner range, while (5000,1] is outer range as the area it covers
/// goes over 2^32.
let inline isInner (struct(s,e): Range) = s < e
/// Checks if given value is contained within current range.
let contains value (struct(s,e): Range) =
if s < e then value > s && value <= e else value > s || value <= e
/// Checks if two ranges overlap. Ranges will overlap if they boundaries are at least next to each other.
let overlap (struct(s1,e1): Range) (struct(s2,e2): Range) =
// add +1 because start (left side) is exclusive
contains (s2 + 1) (struct(s1,e1)) || contains (s1 + 1) (struct(s2,e2))
let private mergeSame (s1: int) (e1: int) (s2: int) (e2: int) : Range option =
let s = Math.Min(s1, s2)
let e = Math.Max(e1, e2)
if s = s1 || e = e1 || (s = s2 && e = e2) then
// either (s2,e2) is within (s1,e2) or they are overlapping or vice versa
Some (struct(s, e))
else None
let private mergeWithInner (s1: int) (e1: int) (s2: int) (e2: int) : Range option =
if s1 > e2 && e1 < s2 then None
else
let s, e =
if e2 >= s1 then (Math.Min(s1, s2), Math.Min(e1, e2)) // join on the right end: (5,0] & (1,5] => (1,0]
else (Math.Max(s1, s2), Math.Max(e1, e2)) // join on the left end: (5,0] & (0,3] => (5,3]
if s = e then Some full else Some (struct(s, e))
/// Attempts to merge two ranges together, returning new range or None if merged ranges were not overlapping.
let tryMerge (struct(s1,e1): Range) (struct(s2,e2): Range) =
if s1 = e1 || s2 = e2 || (s1 = e2 && s2 = e1) then Some full
elif s1 < e1 && s2 < e2 then mergeSame s1 e1 s2 e2 // both ranges are inner
elif s2 < e2 then mergeWithInner s1 e1 s2 e2 // (s1,e1) is outer range
elif s1 < e1 then mergeWithInner s2 e2 s1 e1 // (s2,e2) is outer range
else mergeSame s1 e1 s2 e2
/// Attempts to merge two ranges together, failing with exception if merged ranges were not overlapping.
let merge (a: Range) (b: Range) =
match tryMerge a b with
| Some c -> c
| None ->
let struct(s1, e1) = a
let struct(s2, e2) = b
failwithf "Ranges (%i,%i] and (%i,%i] are not adjacent" s1 e1 s2 e2
/// RingRange describes either continuous or fragmented keyspace that a partition may be responsible for.
[<Struct>]
type RingRange =
| Continuous of continuous:Range
| Fragmented of fragments:Range[]
[<RequireQualifiedAccess>]
module RingRange =
/// Returns a `RingRange` that covers entire available keyspace.
let full: RingRange = Continuous Range.full
/// Checks if given `value` can be found within current ring range.
let contains value (r: RingRange) =
match r with
| Continuous r -> Range.contains value r
| Fragmented rs -> rs |> Array.exists (Range.contains value)
let isFull r =
match r with
| Continuous r -> Range.isFull r
| Fragmented [||] -> false
| Fragmented rs ->
let struct(s1,e1) = rs.[0]
let mutable s1 = s1
let mutable e1 = e1
let mutable i = 1
let mutable result = true
while result && i < rs.Length do
let struct(s2,e2) = rs.[i]
result <- e1 = s2
i <- i + 1
s1 <- s2
e1 <- e2
let struct(s2,_) = rs.[0]
e1 = s2
/// Compacts range fragments together, merging them together if possible.
/// It assumes that fragments are sorted in order of the start position of the range.
let private compact (fragments: Range[]) =
let result = ResizeArray()
let mutable latest = fragments.[0]
for i = 1 to fragments.Length - 1 do
let next = fragments.[i]
if Range.overlap latest next then
latest <- Range.merge latest next
else
result.Add latest
latest <- next
if result.Count = 1 && Range.isFull result.[0]
then Continuous Range.full
else Fragmented (result.ToArray())
let create ranges =
match ranges with
| [||] -> Fragmented [||]
| [|r|] -> Continuous r
| rs ->
let mutable hasOverlaps = false
let mutable latest = rs.[0]
let mutable i = 1
while not hasOverlaps && i < rs.Length do
let next = rs.[i]
hasOverlaps <- Range.overlap latest next
latest <- next
i <- i + 1
if hasOverlaps then compact rs
else Fragmented rs
/// Checks if two ring ranges have overlapping parts.
let overlap a b =
match a, b with
| Continuous r1, Continuous r2 -> Range.overlap r1 r2
| Continuous r, Fragmented rs
| Fragmented rs, Continuous r -> rs |> Array.exists (Range.overlap r)
| Fragmented rs1, Fragmented rs2 ->
rs1 |> Array.exists (fun r ->
rs2 |> Array.exists (Range.overlap r))
/// Merges two ring ranges together.
let merge a b =
match a, b with
| Continuous r1, Continuous r2 ->
match Range.tryMerge r1 r2 with
| Some r -> Continuous r
| None -> Fragmented (Array.sort [| r1; r2 |])
| Continuous r, Fragmented rs
| Fragmented rs, Continuous r ->
let struct(start, _) = r
let idx =
rs
|> Array.tryFindIndexBack (fun (struct(x,_)) -> x < start)
|> Option.defaultValue 0
let rs = ResizeArray(rs)
rs.Insert(idx, r)
create (rs.ToArray())
| Fragmented rs1, Fragmented rs2 ->
let rs = Array.append rs1 rs2
let rs = rs |> Array.sortBy (fun (struct(a,_)) -> a)
create rs
/// Consistent hash ring splits an available key space into chunks of continuous fragments.
///
/// Whenever an new partition is added, it will be assigned a fragment of space, that was managed
/// by it's left neighbor.
///
/// Whenever a partition is removed, it will give back its range to its left neighbor.
type ConsistentHashRing =
{ range: RingRange
hash: int
myself: string
partitions: (struct(int* string))[] }
[<RequireQualifiedAccess>]
module ConsistentHashRing =
/// Computes a consistent hash of a given string.
let inline hash (partition: string) = Akka.Util.MurmurHash.StringHash(partition)
/// Creates a new instance of `ConsistentHashRing` bound to a perspective of a provided partition.
let create (myself: string) =
let h = hash myself
{ range = RingRange.full
hash = h
myself = myself
partitions = [| struct(h, myself) |] }
/// Returns a partition, which is responsible for a given consistent `hash`.
let tryFindByHash hash ring =
ring.partitions |> Array.tryPick (fun struct(h, p) -> if h >= hash then Some p else None)
/// Adds a `partition` to a `ring`. Returns tuple which contains an updated ring including that partition
/// together with an option, that signalizes if ring range managed by `ring` has been changed or not due
/// to a partition insertion.
let add partition ring =
let hash = hash partition
if ring.partitions |> Array.exists (fun struct(h,_) -> h = hash) then ring
else
let selfIdx = ring.partitions |> Array.findIndex (fun struct(_,v) -> v = ring.myself)
let idx = 1 + (ring.partitions |> Array.tryFindIndexBack (fun struct(h, _) -> h < hash) |> Option.defaultValue -1)
let partitions = Array.insert idx (struct(hash, partition)) ring.partitions
let ring = { ring with partitions = partitions }
if selfIdx >= 0 then
if selfIdx = idx || selfIdx = 0 && idx = partitions.Length - 1 then
// new node took over some of my responsibility
{ ring with range = Continuous struct(hash, ring.hash) }
else ring
else ring
/// Removes a `partition` from a `ring`. Returns tuple which contains an updated ring without that partition
/// together with an option, that signalizes if ring range managed by `ring` has been changed or not due to
/// partition removal.
let remove partition ring =
let removeIdx = ring.partitions |> Array.findIndex (fun struct(_,p) -> p = partition)
if removeIdx >= 0 then
let partitions = Array.removeAt removeIdx ring.partitions
let ring = { ring with partitions = partitions }
let selfIdx = ring.partitions |> Array.findIndex (fun struct(_,p) -> p = ring.myself)
let wasPredecessor = removeIdx = selfIdx || (selfIdx = 0 && removeIdx = partitions.Length)
if wasPredecessor then
let range =
if partitions.Length = 1 then RingRange.full
else
let predecessorIdx = Math.Min(Math.Max(0, selfIdx - 1), partitions.Length - 1)
let struct(predecessorHash, _) = partitions.[predecessorIdx]
Continuous struct(predecessorHash, ring.hash)
{ ring with range = range }
else ring
else ring
type VirtualBucketRing =
{ hash: int
myself: string
range: RingRange
bucketsPerPartition: int
buckets: Map<int,string> }
[<RequireQualifiedAccess>]
module VirtualBucketRing =
/// Computes a consistent hash of a given string.
let inline hash (partition: string) = Akka.Util.MurmurHash.StringHash(partition)
let tryFindByHash hash ring =
let found = ring.buckets |> Seq.tryPick (fun e -> if e.Key >= hash then Some e.Value else None)
found |> Option.orElse (ring.buckets |> Seq.tryHead |> Option.map (fun e -> e.Value))
let private uniformHashes value count : int[] =
let hashes = Array.zeroCreate count
for i=0 to Array.length hashes - 1 do
hashes.[i] <- hash (value + string i)
hashes
let private update partition (buckets: Map<int,string>) =
let mutable e = (buckets :> IEnumerable<_>).GetEnumerator()
let ranges = ResizeArray()
if e.MoveNext() then
let first = e.Current
let mutable current = first
while e.MoveNext() do
let next = e.Current
if next.Value = partition then
ranges.Add struct(current.Key, next.Key)
current <- next
if first.Value = partition then // keyspace overlap
ranges.Add struct(current.Key, first.Key)
RingRange.create (ranges.ToArray())
else RingRange.full
let add partition ring =
if Map.exists (fun k v -> v = partition) ring.buckets then ring
else
let hashes = uniformHashes partition ring.bucketsPerPartition
let buckets = hashes |> Array.fold (fun buckets hash ->
match Map.tryFind hash buckets with
| Some other when partition > other -> buckets
| _ -> Map.add hash partition buckets) ring.buckets
let range = update ring.myself buckets
{ ring with range = range; buckets = buckets }
let remove partition ring =
if Map.exists (fun k v -> v = partition) ring.buckets then
let hashes = uniformHashes partition ring.bucketsPerPartition
let buckets =
hashes
|> Array.fold (fun buckets hash -> Map.remove hash buckets) ring.buckets
let range = update ring.myself buckets
{ ring with range = range; buckets = buckets }
else ring
let create bucketsPerPartition myself =
let h = hash myself
let ring =
{ hash = h
myself = myself
range = RingRange.full
buckets = Map.empty
bucketsPerPartition = bucketsPerPartition }
{ add myself ring with range = RingRange.full }
module Tests =
open Expecto
[<Tests>]
let rangeTests = ftestList "Dht" [
test "Range full check" {
Expect.isTrue (Range.isFull Range.full) "full range"
Expect.isFalse (Range.isFull struct(0, 1)) "(0,1] should not be full"
Expect.isFalse (Range.isFull struct(1, 0)) "(1,0] should not be full"
Expect.isFalse (Range.isFull struct(2, 5)) "(2,5] should not be full"
Expect.isFalse (Range.isFull struct(5, 2)) "(5,3] should not be full"
}
test "Range contains" {
let r = struct(1, 5)
Expect.isFalse (Range.contains 0 r) "(1,5] should not contain 0"
Expect.isFalse (Range.contains 1 r) "(1,5] should not contain 1"
Expect.isTrue (Range.contains 2 r) "(1,5] should contain 2"
Expect.isTrue (Range.contains 4 r) "(1,5] should contain 4"
Expect.isTrue (Range.contains 5 r) "(1,5] should contain 5"
Expect.isFalse (Range.contains 6 r) "(1,5] should not contain 6"
}
test "Range contains outer" {
let r = struct(5,1)
Expect.isFalse (Range.contains 4 r) "(5,1] should not contain 4"
Expect.isFalse (Range.contains 5 r) "(5,1] should not contain 5"
Expect.isTrue (Range.contains 6 r) "(5,1] should contain 6"
Expect.isTrue (Range.contains 1000 r) "(5,1] should contain 1000"
Expect.isTrue (Range.contains 0 r) "(5,1] should contain 0"
Expect.isTrue (Range.contains 1 r) "(5,1] should contain 1"
Expect.isFalse (Range.contains 2 r) "(5,1] should not contain 2"
}
test "Range overlaps" {
let r = struct(1,5)
Expect.isFalse (Range.overlap r struct(5,1)) "Ranges (1,5] and (5,1] should not overlap"
Expect.isFalse (Range.overlap r struct(0,1)) "Ranges (1,5] and (0,1] should not overlap"
Expect.isFalse (Range.overlap r struct(5,100)) "Ranges (1,5] and (5,100] should not overlap"
Expect.isTrue (Range.overlap r struct(0,2)) "Ranges (1,5] and (0,2] should overlap"
Expect.isTrue (Range.overlap r struct(2,4)) "Ranges (1,5] and (2,4] should overlap"
Expect.isTrue (Range.overlap r struct(4,1)) "Ranges (1,5] and (4,1] should overlap"
Expect.isTrue (Range.overlap r struct(1,6)) "Ranges (1,5] and (1,6] should overlap"
}
test "Range merge" {
let r = struct(1,5)
Expect.equal Range.full (Range.merge r struct(5,1)) ""
Expect.equal struct(0,6) (Range.merge r struct(0,6)) ""
Expect.equal struct(1,6) (Range.merge r struct(1,6)) ""
Expect.equal struct(1,6) (Range.merge r struct(2,6)) ""
Expect.equal struct(1,6) (Range.merge r struct(5,6)) ""
Expect.equal struct(1,0) (Range.merge r struct(5,0)) ""
let r = struct(5,1)
Expect.equal Range.full (Range.merge r struct(1,5)) ""
Expect.equal Range.full (Range.merge r struct(1,6)) ""
Expect.equal struct(5,1) (Range.merge r struct(7,0)) ""
Expect.equal struct(4,1) (Range.merge r struct(4,0)) ""
Expect.equal struct(5,4) (Range.merge r struct(1,4)) ""
}
test "Consistent hash ring" {
let a = ConsistentHashRing.create "A"
let b = ConsistentHashRing.create "B"
Expect.isTrue (RingRange.isFull a.range) "Ring A should start full"
Expect.isTrue (RingRange.isFull b.range) "Ring B should start full"
// connect nodes
let a = ConsistentHashRing.add "B" a
let b = ConsistentHashRing.add "A" b
Expect.isFalse (RingRange.overlap a.range b.range) "Ranges of responsibilities for A and B should not overlap"
Expect.isTrue (RingRange.isFull (RingRange.merge a.range b.range)) "Ranges of A and B should cover full keyspace"
// ensure that we got consistent responses, no mather which side we ask
for value in ["hello"; "world"] do
let hash = ConsistentHashRing.hash value
let ka = ConsistentHashRing.tryFindByHash hash a
let kb = ConsistentHashRing.tryFindByHash hash b
Expect.equal ka kb (sprintf "Both rings A and B placed '%s' on the same partition" value)
}
test "Virtual bucket ring" {
let a = VirtualBucketRing.create 10 "A"
let b = VirtualBucketRing.create 10 "B"
Expect.isTrue (RingRange.isFull a.range) "Ring A should start full"
Expect.isTrue (RingRange.isFull b.range) "Ring B should start full"
// connect nodes
let a = VirtualBucketRing.add "B" a
let b = VirtualBucketRing.add "A" b
Expect.isFalse (RingRange.overlap a.range b.range) "Ranges of responsibilities for A and B should not overlap"
Expect.isTrue (RingRange.isFull (RingRange.merge a.range b.range)) "Ranges of A and B should cover full keyspace"
// ensure that we got consistent responses, no mather which side we ask
for value in ["hello"; "world"] do
let hash = VirtualBucketRing.hash value
let ka = VirtualBucketRing.tryFindByHash hash a
let kb = VirtualBucketRing.tryFindByHash hash b
Expect.equal ka kb (sprintf "Both rings A and B placed '%s' on the same partition" value)
}
]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment