Skip to content

Instantly share code, notes, and snippets.

@polytypic
Last active November 13, 2023 09:26
Show Gist options
  • Save polytypic/70d5f7981a61a8c4ad4b9cce590d6f72 to your computer and use it in GitHub Desktop.
Save polytypic/70d5f7981a61a8c4ad4b9cce590d6f72 to your computer and use it in GitHub Desktop.
A lock-free skiplist
(* Copyright (c) 2023 Vesa Karvonen
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH
REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY
AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT,
INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM
LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR
OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR
PERFORMANCE OF THIS SOFTWARE. *)
(* This implementation has been written from scratch with inspiration from a
lock-free skiplist implementation in PR
https://github.com/ocaml-multicore/saturn/pull/65
by
Sooraj Srinivasan ( https://github.com/sooraj-srini )
including tests and changes by
Carine Morel ( https://github.com/lyrm ). *)
(* TODO: Grow and possibly shrink the skiplist or e.g. adjust search and node
generation based on the dynamic number of bindings. *)
(* OCaml doesn't allow us to use one of the unused (always 0) bits in pointers
for the marks and an indirection is needed. This representation avoids the
indirection except for marked references in nodes to be removed. A GADT with
polymorphic variants is used to disallow nested [Mark]s. *)
type ('k, 'v, _) node =
| Null : ('k, 'v, [> `Null ]) node
| Node : {
key : 'k;
value : 'v;
next : ('k, 'v) links;
}
-> ('k, 'v, [> `Node ]) node
| Mark : ('k, 'v, [< `Null | `Node ]) node -> ('k, 'v, [ `Mark ]) node
(* The implementation relies on this existential being unboxed. More
specifically, it is assumed that [Link node == Link node] meaning that the
[Link] constructor does not allocate. *)
and ('k, 'v) link = Link : ('k, 'v, _) node -> ('k, 'v) link [@@unboxed]
and ('k, 'v) links = ('k, 'v) link Atomic.t array
(* Encoding the [compare] function using an algebraic type would allow the
overhead of calling a closure to be avoided for selected primitive types like
[int]. *)
type ('k, 'v) t = { compare : 'k -> 'k -> int; root : ('k, 'v) links }
(* *)
(** [get_random_height max_height] gives a random value [n] in the range from
[1] to [max_height] with the desired distribution such that [n] is twice as
likely as [n + 1]. *)
let rec get_random_height max_height =
let m = (1 lsl max_height) - 1 in
let x = Random.bits () land m in
if x = 1 then
(* We reject [1] to get the desired distribution. *)
get_random_height max_height
else
(* We do a binary search for the highest 1 bit. Techniques in
Using de Bruijn Sequences to Index a 1 in a Computer Word
by Leiserson, Prokop, and Randall
could perhaps speed this up a bit, but this likely not performance
critical. *)
let n = 0 in
let n, x = if 0xFFFF < x then (n + 0x10, x lsr 0x10) else (n, x) in
let n, x = if 0x00FF < x then (n + 0x08, x lsr 0x08) else (n, x) in
let n, x = if 0x000F < x then (n + 0x04, x lsr 0x04) else (n, x) in
let n, x = if 0x0003 < x then (n + 0x02, x lsr 0x02) else (n, x) in
let n, _ = if 0x0001 < x then (n + 0x01, x lsr 0x01) else (n, x) in
max_height - n
(* *)
let[@inline] is_marked = function
| Link (Mark _) -> true
| Link (Null | Node _) -> false
(* *)
let rec find_in t key prev preds succs level lowest =
(* Breaking the sequence of dependent loads could improve performance. *)
let prev_at_level = Array.unsafe_get prev level in
match Atomic.get prev_at_level with
| Link Null ->
Array.unsafe_set preds level prev_at_level;
Array.unsafe_set succs level Null;
lowest < level && find_in t key prev preds succs (level - 1) lowest
| Link (Node r as curr_node) as before -> begin
match Atomic.get (Array.unsafe_get r.next level) with
| Link (Null | Node _) ->
let c = t.compare key r.key in
if 0 < c then find_in t key r.next preds succs level lowest
else begin
Array.unsafe_set preds level prev_at_level;
Array.unsafe_set succs level curr_node;
if lowest < level then
find_in t key prev preds succs (level - 1) lowest
else 0 = c
end
| Link (Mark after) ->
Atomic.compare_and_set prev_at_level before (Link after) |> ignore;
find_in t key prev preds succs level lowest
end
| Link (Mark _) ->
find_in t key t.root preds succs (Array.length t.root - 1) lowest
(** [find_in t key preds succs lowest] tries to find the node with the specified
[key], updating [preds] and [succs] and removing nodes with marked
references along the way, and always descending down to [lowest] level. The
boolean return value is only meaningful when [lowest] is given as [0]. *)
let[@inline] find_in t key preds succs lowest =
find_in t key t.root preds succs (Array.length t.root - 1) lowest
(* *)
let rec find_node t key prev level =
(* Breaking the sequence of dependent loads could improve performance. *)
let prev_at_level = Array.unsafe_get prev level in
match Atomic.get prev_at_level with
| Link Null -> if 0 < level then find_node t key prev (level - 1) else Null
| Link (Node r as node) as before -> begin
match Atomic.get (Array.unsafe_get r.next level) with
| Link (Null | Node _) ->
let c = t.compare key r.key in
if 0 < c then find_node t key r.next level
else if 0 = c then
(* At this point we know the node was not removed, because removal
is done in order of descending levels. *)
node
else if 0 < level then find_node t key prev (level - 1)
else Null
| Link (Mark after) ->
Atomic.compare_and_set prev_at_level before (Link after) |> ignore;
find_node t key prev level
end
| Link (Mark _) -> find_node t key t.root (Array.length t.root - 1)
(** [find_node t key] tries to find the node with the specified [key], removing
nodes with marked references along the way, and stopping as soon as the node
is found. *)
let[@inline] find_node t key = find_node t key t.root (Array.length t.root - 1)
(* *)
let create ?(max_height = 10) ?(compare = compare) () =
(* The upper limit of [30] comes from [Random.bits ()] as well as from
limitations with 32-bit implementations. It should not be a problem in
practice. *)
if max_height < 1 || 30 < max_height then
invalid_arg "Skiplist: max_height must be in the range [1, 30]";
let root = Array.init max_height @@ fun _ -> Atomic.make (Link Null) in
{ compare; root }
(* *)
let find_opt t key =
match find_node t key with Null -> None | Node r -> Some r.value
(* *)
let mem t key = find_node t key != Null
(* *)
let rec add t key value preds succs =
(not (find_in t key preds succs 0))
&&
let next =
let height = get_random_height (Array.length t.root) in
Array.init height @@ fun i -> Atomic.make (Link (Array.unsafe_get succs i))
in
let node = Link (Node { key; value; next }) in
if
let succ = Link (Array.unsafe_get succs 0) in
Atomic.compare_and_set (Array.unsafe_get preds 0) succ node
then
(* The node is now considered as added to the skiplist. *)
let rec update_levels level =
if Array.length next = level then begin
if is_marked (Atomic.get (Array.unsafe_get next (level - 1))) then begin
(* The node we finished adding has been removed concurrently. To
ensure that no references we added to the node remain, we call
[find_node] which will remove nodes with marked references along
the way. *)
find_node t key |> ignore
end;
true
end
else if
let succ = Link (Array.unsafe_get succs level) in
Atomic.compare_and_set (Array.unsafe_get preds level) succ node
then update_levels (level + 1)
else
let _found = find_in t key preds succs level in
let rec update_nexts level' =
if level' < level then update_levels level
else
let next = Array.unsafe_get next level' in
match Atomic.get next with
| Link (Null | Node _) as before ->
let succ = Link (Array.unsafe_get succs level') in
if before != succ then
if Atomic.compare_and_set next before succ then
update_nexts (level' - 1)
else update_levels level
else update_nexts (level' - 1)
| Link (Mark _) ->
(* The node we were trying to add has been removed concurrently.
To ensure that no references we added to the node remain, we
call [find_node] which will remove nodes with marked
references along the way. *)
find_node t key |> ignore;
true
in
update_nexts (Array.length next - 1)
in
update_levels 1
else add t key value preds succs
let add t key value =
let preds = Array.make (Array.length t.root) (Obj.magic ()) in
let succs = Array.make (Array.length t.root) Null in
add t key value preds succs
(* *)
let remove t key =
match find_node t key with
| Null -> false
| Node { next; _ } ->
let rec update_upper_levels level =
if 0 < level then
let link = Array.unsafe_get next level in
match Atomic.get link with
| Link (Mark _) -> update_upper_levels (level - 1)
| Link ((Null | Node _) as succ) ->
let marked_succ = Mark succ in
if Atomic.compare_and_set link (Link succ) (Link marked_succ) then
update_upper_levels (level - 1)
else update_upper_levels level
in
update_upper_levels (Array.length next - 1);
let rec try_update_bottom_level link =
match Atomic.get link with
| Link (Mark _) -> false
| Link ((Null | Node _) as succ) ->
let marked_succ = Mark succ in
if Atomic.compare_and_set link (Link succ) (Link marked_succ) then
(* We have finished marking references on the node. To ensure
that no references to the node remain, we call [find_node]
which will remove nodes with marked references along the
way. *)
let _node = find_node t key in
true
else try_update_bottom_level link
in
try_update_bottom_level (Array.unsafe_get next 0)
(** A lock-free skiplist. *)
type (!'k, !'v) t
(** *)
val create : ?max_height:int -> ?compare:('k -> 'k -> int) -> unit -> ('k, 'v) t
(** *)
val find_opt : ('k, 'v) t -> 'k -> 'v option
(** *)
val mem : ('k, 'v) t -> 'k -> bool
(** *)
val add : ('k, 'v) t -> 'k -> 'v -> bool
(** *)
val remove : ('k, 'v) t -> 'k -> bool
(** *)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment