Navigation Menu

Skip to content

Instantly share code, notes, and snippets.

@lexi-lambda
Last active May 26, 2020 11:36
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save lexi-lambda/c54c91867f931b56123e3c595d8e445a to your computer and use it in GitHub Desktop.
Save lexi-lambda/c54c91867f931b56123e3c595d8e445a to your computer and use it in GitHub Desktop.
#lang racket/base
; Lock-free FIFO queues, loosely based on “Simple, Fast, and Practical
; Non-Blocking and Blocking Concurrent Queue Algorithms”.
(require racket/contract
racket/future
(only-in racket/unsafe/ops unsafe-struct*-cas!))
(provide queue?
make-queue
(contract-out [enqueue! (-> queue? any/c void?)]
[dequeue! (-> queue? any/c)]
[try-dequeue! (-> queue? any/c)]
[queue-length (-> queue? exact-nonnegative-integer?)]))
; A queue is two pointers into a mutable, singly-linked list of nodes:
; head: Points at the first node in the list.
; tail: Usually points at the last node in the list, but briefly points at the
; second-to-last node during an enqueue! operation.
; It also contains a future semaphore, which allows dequeue! to block on an
; empty queue (and doubles as a length counter).
;
; Note that head and tail are *always* nodes, even if the queue is empty. To
; maintain this invariant, head does not actually contain the first value in the
; queue. Rather, the first value in the queue is (node-value (node-next head)).
; This allows enqueue! to only touch the tail and dequeue! to only touch the
; head, avoiding a need for synchronization between modifications to both fields.
(struct queue ([head #:mutable] [tail #:mutable] semaphore) #:authentic)
; Mutable pairs a la mcons with support for atomic updates.
(struct node ([value #:mutable] [next #:mutable]) #:authentic)
; It would be cool if #:authentic generated these for mutable fields.
(define (queue-head-cas! q old new)
(unless (queue? q) (raise-argument-error 'queue-head-cas! "queue?" q))
(unsafe-struct*-cas! q 0 old new))
(define (queue-tail-cas! q old new)
(unless (queue? q) (raise-argument-error 'queue-tail-cas! "queue?" q))
(unsafe-struct*-cas! q 1 old new))
(define (node-next-cas! n old new)
(unless (node? n) (raise-argument-error 'node-next-cas! "node?" n))
(unsafe-struct*-cas! n 1 old new))
(define (make-queue)
(define n (node #f #f))
(queue n n (make-fsemaphore 0)))
(define (queue-length q)
(fsemaphore-count (queue-semaphore q)))
(define (enqueue! q v)
(define new (node v #f))
(let retry ()
(define old (queue-tail q))
(cond
; If (node-next old) is #f, then it is the end of the node list, so we can
; just insert our new node there.
[(node-next-cas! old #f new)
; We update tail via CAS because concurrent enqueue! operations will
; advance it themselves if it falls behind so they don’t have to wait for
; us. By the time we get a turn, tail may have been advanced past our
; node, so we want to just return.
(queue-tail-cas! q old new)
(fsemaphore-post (queue-semaphore q))]
[else
; Another enqueue! beat us; ensure the tail is updated and try again.
(queue-tail-cas! q old (node-next old))
(retry)])))
(define (dequeue! q)
(fsemaphore-wait (queue-semaphore q))
(do-dequeue! q))
(define (try-dequeue! q)
(and (fsemaphore-try-wait? (queue-semaphore q))
(do-dequeue! q)))
; Precondition: q is non-empty.
(define (do-dequeue! q)
(let retry ()
(define old (queue-head q))
(define new (node-next old))
(cond
[(queue-head-cas! q old new)
; Return the dequeued value, setting the value of the new head to #f
; to preserve invariant F. This ensures we don’t needlessly keep the
; GC from collecting the value.
(define value (node-value new))
(set-node-value! new #f)
value]
[else
; Another dequeue! beat us; try again.
(retry)])))
(module* test racket/base
(require racket/future rackunit (submod ".."))
(define (box-add! b n)
(let retry ()
(define old (unbox b))
(unless (box-cas! b old (+ old n))
(retry))))
(define (run-test-configuration #:futures futures
#:elements elements
#:iterations iterations)
(with-check-info (['futures futures]
['elements elements]
['iterations iterations])
(test-begin
(define queue (make-queue))
(define total (box 0))
(define producers
(for/list ([i (in-range futures)])
(future (λ () (for ([j (in-range elements)])
(enqueue! queue (for/sum ([k (in-range iterations)])
(+ (* elements iterations i)
(* iterations j)
k))))))))
(define consumers
(for/list ([i (in-range futures)])
(future (λ () (for ([j (in-range elements)])
(box-add! total (dequeue! queue)))))))
(define forcers
(for/list ([f (in-sequences producers consumers)])
(thread (λ () (touch f)))))
(for-each thread-wait forcers)
(check-equal? (unbox total) (for/sum ([i (* futures elements iterations)]) i)))))
(run-test-configuration #:futures 10 #:elements 1000 #:iterations 1000)
(run-test-configuration #:futures 3 #:elements 10000 #:iterations 500)
(run-test-configuration #:futures 50 #:elements 1000 #:iterations 200))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment