Skip to content

Instantly share code, notes, and snippets.

@dkochmanski
Created March 28, 2019 13:46
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save dkochmanski/4de37d4e7432d8d59afa7d2a64a4bb41 to your computer and use it in GitHub Desktop.
Save dkochmanski/4de37d4e7432d8d59afa7d2a64a4bb41 to your computer and use it in GitHub Desktop.
;;; AGPL-3.0-or-later, (c) Daniel Kochmański 2019
;;;
;;; Mailbox implementation on top of condition variables and locks
;;; interface (provided by bordeaux-threads). It is
;;; interface-compatible with SBCL implementation in sb-currency
;;; module. Tests attached below, it is expected that alexandria,
;;; bordeaux-threads and fiveam are loaded before running this file.
;;; (asdf:load-system '(alexandria bordeaux-threads fiveam))
(defpackage #:emulated-mailbox
(:use #:cl #:alexandria)
(:export #:mailbox #:make-mailbox #:mailboxp #:mailbox-name #:mailbox-count #:mailbox-empty-p
#:list-mailbox-messages #:receive-message #:receive-message-no-hang
#:receive-pending-messages #:send-message))
(in-package #:end-utilities.emulated-mailbox)
(defstruct (mailbox (:constructor %make-mailbox)
(:predicate mailboxp))
%lock %cvar %messages %messages-tail %count name)
(defmethod print-object ((object mailbox) stream)
(print-unreadable-object (object stream :type t :identity t)
(if-let ((name (mailbox-name object))
(count (mailbox-%count object)))
(format stream "(~s ~s msgs pending)" name count)
(format stream "(~s msgs pending)" count))))
(defun make-mailbox (&key name initial-contents)
(check-type name (or null string))
(check-type initial-contents list)
(let ((locative (list nil)))
(%make-mailbox :name name
:%lock (bt:make-lock name)
:%cvar (bt:make-condition-variable :name name)
:%count (length initial-contents)
:%messages (append initial-contents locative)
:%messages-tail locative)))
(defun list-mailbox-messages (mailbox)
(check-type mailbox mailbox)
(bt:with-lock-held ((mailbox-%lock mailbox))
(loop ; last cons is a locative
for (i . j) on (mailbox-%messages mailbox)
until (null j) collect i)))
(defun mailbox-count (mailbox)
(check-type mailbox mailbox)
(bt:with-lock-held ((mailbox-%lock mailbox))
(mailbox-%count mailbox)))
(defun mailbox-empty-p (mailbox)
(check-type mailbox mailbox)
(bt:with-lock-held ((mailbox-%lock mailbox))
(zerop (mailbox-%count mailbox))))
(defun receive-message (mailbox &key timeout)
(check-type mailbox mailbox)
(bt:with-lock-held ((mailbox-%lock mailbox))
(loop
while (zerop (mailbox-%count mailbox))
do (or (bt:condition-wait (mailbox-%cvar mailbox)
(mailbox-%lock mailbox)
:timeout timeout)
(return-from receive-message (values nil nil))))
(decf (mailbox-%count mailbox))
(values (pop (mailbox-%messages mailbox)) t)))
(defun receive-message-no-hang (mailbox)
(check-type mailbox mailbox)
(bt:with-lock-held ((mailbox-%lock mailbox))
(unless (zerop (mailbox-%count mailbox))
(decf (mailbox-%count mailbox))
(values (pop (mailbox-%messages mailbox)) t))))
(defun receive-pending-messages (mailbox &optional n)
(check-type mailbox mailbox)
(check-type n (or null (integer 0)))
(bt:with-lock-held ((mailbox-%lock mailbox))
(unless (zerop (mailbox-%count mailbox))
(unless n (setf n (mailbox-%count mailbox)))
(alexandria:minf n (mailbox-%count mailbox))
(decf (mailbox-%count mailbox) n)
(loop
repeat n
collect (pop (mailbox-%messages mailbox))))))
(defun send-message (mailbox message)
(check-type mailbox mailbox)
(bt:with-lock-held ((mailbox-%lock mailbox))
(incf (mailbox-%count mailbox))
(rplaca (mailbox-%messages-tail mailbox) message)
(setf (mailbox-%messages-tail mailbox)
(cdr (rplacd (mailbox-%messages-tail mailbox) (list nil))))
t))
(5am:test mailbox-send-receive
(let ((mailbox (make-mailbox :name "my smilebox")))
(5am:is (mailboxp mailbox))
(5am:is (not (mailboxp 42)))
(5am:is (equalp (mailbox-name mailbox) "my smilebox"))
;; Add some
(send-message mailbox "daniel")
(send-message mailbox "ma")
(send-message mailbox 4)
;; Ensure FIFO order
(5am:is (string= "daniel" (receive-message mailbox)))
;; Add some more
(send-message mailbox "zielone")
(send-message mailbox "koty")
;; Ensure counting
(5am:is (= 4 (mailbox-count mailbox)))
(5am:is (null (mailbox-empty-p mailbox)))
;; Ensure FIFO order and and other receive operations
(5am:is (equalp "ma" (receive-message mailbox)))
(5am:is (equalp 4 (receive-message mailbox)))
(5am:is (equalp "zielone" (receive-message-no-hang mailbox)))
(5am:is (= 1 (mailbox-count mailbox)))
(5am:is (equalp "koty" (receive-message-no-hang mailbox)))
;; Ensure empty box behavior
(5am:is (= 0 (mailbox-count mailbox)))
(5am:is (mailbox-empty-p mailbox))
(5am:is (null (receive-message-no-hang mailbox)))
(5am:is (null (receive-message mailbox :timeout 0.1)))
(5am:is (= 0 (mailbox-count mailbox)))
(5am:is (mailbox-empty-p mailbox))
;; add some new data and verify group operations
(dotimes (v 10) (send-message mailbox v))
(5am:is (= 10 (mailbox-count mailbox)))
(5am:is (not (mailbox-empty-p mailbox)))
(5am:is (equalp '(0 1 2 3 4 5 6 7 8 9) (list-mailbox-messages mailbox)))
(5am:is (null (receive-pending-messages mailbox 0)))
(5am:signals type-error (receive-pending-messages mailbox -1))
(5am:is (= 10 (mailbox-count mailbox)))
(5am:is (not (mailbox-empty-p mailbox)))
(5am:is (equalp '(0 1 2 3 4) (receive-pending-messages mailbox 5)))
(5am:is (equalp '(5 6 7 8 9) (list-mailbox-messages mailbox)))
(5am:is (= (mailbox-count mailbox) 5))
(5am:is (not (mailbox-empty-p mailbox)))
(5am:is (equalp '(5 6 7 8 9) (receive-pending-messages mailbox 20)))
(5am:is (= (mailbox-count mailbox) 0))
(5am:is (mailbox-empty-p mailbox))
(5am:is (null (receive-pending-messages mailbox 20)))
(5am:is (= 0 (mailbox-count mailbox)))
(5am:is (mailbox-empty-p mailbox))
;; receive-pending-messages without optional argument
(dotimes (v 10) (send-message mailbox v))
(5am:is (= 10 (mailbox-count mailbox)))
(5am:is (not (mailbox-empty-p mailbox)))
(5am:is (equalp '(0 1 2 3 4 5 6 7 8 9) (receive-pending-messages mailbox)))
(5am:is (= 0 (mailbox-count mailbox)))
(5am:is (mailbox-empty-p mailbox))))
(5am:test producent-consumers
(let (;; we start from 0 and always send messages
;; monotonically increasing. That means that each
;; consumer has monotonically increasing
;; elements. Note that results-mailbox may have all
;; that jumbled up, because different consumers may
;; send at different times.
(mailbox (make-mailbox :initial-contents '(0)))
;; we will submit all results to the second mailbox,
;; then sort them and verify if nothing got lost. This
;; also gives us concurrent send-message.
(results-mailbox (make-mailbox))
;; all errors in a thread should be send here. For
;; intance if messages are not monotonous from the
;; consumer perspective it is an ordering issue.
(errors-mailbox (make-mailbox))
;; flag to tell consumers that we are full. We always
;; have a timeout in make-1-consumer (and
;; make-n-consumer returns immedietely), so we may
;; safely assume that join-thread will return.
(konsument-je-żeby-jeść-p nil)
;; last added element to the mailbox called from
;; producent thrad and *not* thread-safe - that means
;; that only one producent at a time should run. For
;; multiple producent we resend to results-mailbox
;; from consumer threads.
(last-element-added 0))
(flet ((make-producent (n &optional sleep)
(lambda ()
(loop for i from 1 upto n
do (progn (send-message mailbox (+ last-element-added i))
(when sleep (sleep sleep))))
(incf last-element-added n)))
;; Consumers return T if they didn't put hand on any message.
(make-1-consumer (&optional timeout sleep)
(lambda ()
(loop until (and konsument-je-żeby-jeść-p
(mailbox-empty-p mailbox))
with last-message = nil
do (when-let ((message (if timeout
(receive-message mailbox :timeout timeout)
(receive-message-no-hang mailbox))))
(when (and last-message (> last-message message))
(send-message errors-mailbox (cons last-message message)))
(setf last-message message)
(send-message results-mailbox message)
(when sleep (sleep sleep)))
finally (return (not last-message)))))
(make-n-consumer (batch-size &optional sleep)
(lambda ()
(loop until (and konsument-je-żeby-jeść-p
(mailbox-empty-p mailbox))
with send = (curry #'send-message results-mailbox)
with starved = t
do (progn
(when-let ((messages (receive-pending-messages mailbox batch-size)))
(mapc send messages)
(setf starved nil))
(when sleep (sleep sleep)))
finally (return starved)))))
(let ((producent (bt:make-thread (make-producent 5000000)))
(threads (append (loop repeat 4 collect (bt:make-thread (make-1-consumer)))
(loop repeat 4 collect (bt:make-thread (make-1-consumer nil 0.1)))
(loop repeat 4 collect (bt:make-thread (make-1-consumer 1)))
(loop repeat 4 collect (bt:make-thread (make-n-consumer 3))))))
(bt:join-thread producent)
(setf konsument-je-żeby-jeść-p t)
(5am:is (every (compose #'not #'bt:join-thread) threads)
"Some consumers were starved to death")
(5am:is (mailbox-empty-p errors-mailbox)
"Some messages were not in FIFO order")
(5am:is (= (mailbox-count results-mailbox) (1+ last-element-added))
"Some messages were not processed")))))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment