-
-
Save dkochmanski/4de37d4e7432d8d59afa7d2a64a4bb41 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
;;; AGPL-3.0-or-later, (c) Daniel Kochmański 2019 | |
;;; | |
;;; Mailbox implementation on top of condition variables and locks | |
;;; interface (provided by bordeaux-threads). It is | |
;;; interface-compatible with SBCL implementation in sb-currency | |
;;; module. Tests attached below, it is expected that alexandria, | |
;;; bordeaux-threads and fiveam are loaded before running this file. | |
;;; (asdf:load-system '(alexandria bordeaux-threads fiveam)) | |
(defpackage #:emulated-mailbox | |
(:use #:cl #:alexandria) | |
(:export #:mailbox #:make-mailbox #:mailboxp #:mailbox-name #:mailbox-count #:mailbox-empty-p | |
#:list-mailbox-messages #:receive-message #:receive-message-no-hang | |
#:receive-pending-messages #:send-message)) | |
(in-package #:end-utilities.emulated-mailbox) | |
(defstruct (mailbox (:constructor %make-mailbox) | |
(:predicate mailboxp)) | |
%lock %cvar %messages %messages-tail %count name) | |
(defmethod print-object ((object mailbox) stream) | |
(print-unreadable-object (object stream :type t :identity t) | |
(if-let ((name (mailbox-name object)) | |
(count (mailbox-%count object))) | |
(format stream "(~s ~s msgs pending)" name count) | |
(format stream "(~s msgs pending)" count)))) | |
(defun make-mailbox (&key name initial-contents) | |
(check-type name (or null string)) | |
(check-type initial-contents list) | |
(let ((locative (list nil))) | |
(%make-mailbox :name name | |
:%lock (bt:make-lock name) | |
:%cvar (bt:make-condition-variable :name name) | |
:%count (length initial-contents) | |
:%messages (append initial-contents locative) | |
:%messages-tail locative))) | |
(defun list-mailbox-messages (mailbox) | |
(check-type mailbox mailbox) | |
(bt:with-lock-held ((mailbox-%lock mailbox)) | |
(loop ; last cons is a locative | |
for (i . j) on (mailbox-%messages mailbox) | |
until (null j) collect i))) | |
(defun mailbox-count (mailbox) | |
(check-type mailbox mailbox) | |
(bt:with-lock-held ((mailbox-%lock mailbox)) | |
(mailbox-%count mailbox))) | |
(defun mailbox-empty-p (mailbox) | |
(check-type mailbox mailbox) | |
(bt:with-lock-held ((mailbox-%lock mailbox)) | |
(zerop (mailbox-%count mailbox)))) | |
(defun receive-message (mailbox &key timeout) | |
(check-type mailbox mailbox) | |
(bt:with-lock-held ((mailbox-%lock mailbox)) | |
(loop | |
while (zerop (mailbox-%count mailbox)) | |
do (or (bt:condition-wait (mailbox-%cvar mailbox) | |
(mailbox-%lock mailbox) | |
:timeout timeout) | |
(return-from receive-message (values nil nil)))) | |
(decf (mailbox-%count mailbox)) | |
(values (pop (mailbox-%messages mailbox)) t))) | |
(defun receive-message-no-hang (mailbox) | |
(check-type mailbox mailbox) | |
(bt:with-lock-held ((mailbox-%lock mailbox)) | |
(unless (zerop (mailbox-%count mailbox)) | |
(decf (mailbox-%count mailbox)) | |
(values (pop (mailbox-%messages mailbox)) t)))) | |
(defun receive-pending-messages (mailbox &optional n) | |
(check-type mailbox mailbox) | |
(check-type n (or null (integer 0))) | |
(bt:with-lock-held ((mailbox-%lock mailbox)) | |
(unless (zerop (mailbox-%count mailbox)) | |
(unless n (setf n (mailbox-%count mailbox))) | |
(alexandria:minf n (mailbox-%count mailbox)) | |
(decf (mailbox-%count mailbox) n) | |
(loop | |
repeat n | |
collect (pop (mailbox-%messages mailbox)))))) | |
(defun send-message (mailbox message) | |
(check-type mailbox mailbox) | |
(bt:with-lock-held ((mailbox-%lock mailbox)) | |
(incf (mailbox-%count mailbox)) | |
(rplaca (mailbox-%messages-tail mailbox) message) | |
(setf (mailbox-%messages-tail mailbox) | |
(cdr (rplacd (mailbox-%messages-tail mailbox) (list nil)))) | |
t)) | |
(5am:test mailbox-send-receive | |
(let ((mailbox (make-mailbox :name "my smilebox"))) | |
(5am:is (mailboxp mailbox)) | |
(5am:is (not (mailboxp 42))) | |
(5am:is (equalp (mailbox-name mailbox) "my smilebox")) | |
;; Add some | |
(send-message mailbox "daniel") | |
(send-message mailbox "ma") | |
(send-message mailbox 4) | |
;; Ensure FIFO order | |
(5am:is (string= "daniel" (receive-message mailbox))) | |
;; Add some more | |
(send-message mailbox "zielone") | |
(send-message mailbox "koty") | |
;; Ensure counting | |
(5am:is (= 4 (mailbox-count mailbox))) | |
(5am:is (null (mailbox-empty-p mailbox))) | |
;; Ensure FIFO order and and other receive operations | |
(5am:is (equalp "ma" (receive-message mailbox))) | |
(5am:is (equalp 4 (receive-message mailbox))) | |
(5am:is (equalp "zielone" (receive-message-no-hang mailbox))) | |
(5am:is (= 1 (mailbox-count mailbox))) | |
(5am:is (equalp "koty" (receive-message-no-hang mailbox))) | |
;; Ensure empty box behavior | |
(5am:is (= 0 (mailbox-count mailbox))) | |
(5am:is (mailbox-empty-p mailbox)) | |
(5am:is (null (receive-message-no-hang mailbox))) | |
(5am:is (null (receive-message mailbox :timeout 0.1))) | |
(5am:is (= 0 (mailbox-count mailbox))) | |
(5am:is (mailbox-empty-p mailbox)) | |
;; add some new data and verify group operations | |
(dotimes (v 10) (send-message mailbox v)) | |
(5am:is (= 10 (mailbox-count mailbox))) | |
(5am:is (not (mailbox-empty-p mailbox))) | |
(5am:is (equalp '(0 1 2 3 4 5 6 7 8 9) (list-mailbox-messages mailbox))) | |
(5am:is (null (receive-pending-messages mailbox 0))) | |
(5am:signals type-error (receive-pending-messages mailbox -1)) | |
(5am:is (= 10 (mailbox-count mailbox))) | |
(5am:is (not (mailbox-empty-p mailbox))) | |
(5am:is (equalp '(0 1 2 3 4) (receive-pending-messages mailbox 5))) | |
(5am:is (equalp '(5 6 7 8 9) (list-mailbox-messages mailbox))) | |
(5am:is (= (mailbox-count mailbox) 5)) | |
(5am:is (not (mailbox-empty-p mailbox))) | |
(5am:is (equalp '(5 6 7 8 9) (receive-pending-messages mailbox 20))) | |
(5am:is (= (mailbox-count mailbox) 0)) | |
(5am:is (mailbox-empty-p mailbox)) | |
(5am:is (null (receive-pending-messages mailbox 20))) | |
(5am:is (= 0 (mailbox-count mailbox))) | |
(5am:is (mailbox-empty-p mailbox)) | |
;; receive-pending-messages without optional argument | |
(dotimes (v 10) (send-message mailbox v)) | |
(5am:is (= 10 (mailbox-count mailbox))) | |
(5am:is (not (mailbox-empty-p mailbox))) | |
(5am:is (equalp '(0 1 2 3 4 5 6 7 8 9) (receive-pending-messages mailbox))) | |
(5am:is (= 0 (mailbox-count mailbox))) | |
(5am:is (mailbox-empty-p mailbox)))) | |
(5am:test producent-consumers | |
(let (;; we start from 0 and always send messages | |
;; monotonically increasing. That means that each | |
;; consumer has monotonically increasing | |
;; elements. Note that results-mailbox may have all | |
;; that jumbled up, because different consumers may | |
;; send at different times. | |
(mailbox (make-mailbox :initial-contents '(0))) | |
;; we will submit all results to the second mailbox, | |
;; then sort them and verify if nothing got lost. This | |
;; also gives us concurrent send-message. | |
(results-mailbox (make-mailbox)) | |
;; all errors in a thread should be send here. For | |
;; intance if messages are not monotonous from the | |
;; consumer perspective it is an ordering issue. | |
(errors-mailbox (make-mailbox)) | |
;; flag to tell consumers that we are full. We always | |
;; have a timeout in make-1-consumer (and | |
;; make-n-consumer returns immedietely), so we may | |
;; safely assume that join-thread will return. | |
(konsument-je-żeby-jeść-p nil) | |
;; last added element to the mailbox called from | |
;; producent thrad and *not* thread-safe - that means | |
;; that only one producent at a time should run. For | |
;; multiple producent we resend to results-mailbox | |
;; from consumer threads. | |
(last-element-added 0)) | |
(flet ((make-producent (n &optional sleep) | |
(lambda () | |
(loop for i from 1 upto n | |
do (progn (send-message mailbox (+ last-element-added i)) | |
(when sleep (sleep sleep)))) | |
(incf last-element-added n))) | |
;; Consumers return T if they didn't put hand on any message. | |
(make-1-consumer (&optional timeout sleep) | |
(lambda () | |
(loop until (and konsument-je-żeby-jeść-p | |
(mailbox-empty-p mailbox)) | |
with last-message = nil | |
do (when-let ((message (if timeout | |
(receive-message mailbox :timeout timeout) | |
(receive-message-no-hang mailbox)))) | |
(when (and last-message (> last-message message)) | |
(send-message errors-mailbox (cons last-message message))) | |
(setf last-message message) | |
(send-message results-mailbox message) | |
(when sleep (sleep sleep))) | |
finally (return (not last-message))))) | |
(make-n-consumer (batch-size &optional sleep) | |
(lambda () | |
(loop until (and konsument-je-żeby-jeść-p | |
(mailbox-empty-p mailbox)) | |
with send = (curry #'send-message results-mailbox) | |
with starved = t | |
do (progn | |
(when-let ((messages (receive-pending-messages mailbox batch-size))) | |
(mapc send messages) | |
(setf starved nil)) | |
(when sleep (sleep sleep))) | |
finally (return starved))))) | |
(let ((producent (bt:make-thread (make-producent 5000000))) | |
(threads (append (loop repeat 4 collect (bt:make-thread (make-1-consumer))) | |
(loop repeat 4 collect (bt:make-thread (make-1-consumer nil 0.1))) | |
(loop repeat 4 collect (bt:make-thread (make-1-consumer 1))) | |
(loop repeat 4 collect (bt:make-thread (make-n-consumer 3)))))) | |
(bt:join-thread producent) | |
(setf konsument-je-żeby-jeść-p t) | |
(5am:is (every (compose #'not #'bt:join-thread) threads) | |
"Some consumers were starved to death") | |
(5am:is (mailbox-empty-p errors-mailbox) | |
"Some messages were not in FIFO order") | |
(5am:is (= (mailbox-count results-mailbox) (1+ last-element-added)) | |
"Some messages were not processed"))))) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment