Skip to content

Instantly share code, notes, and snippets.

@AkashaP
Forked from artob/mqueue.asd
Last active April 13, 2022 16:12
Show Gist options
  • Save AkashaP/7081d9a6f1813b9e4c439f24b44684c9 to your computer and use it in GitHub Desktop.
Save AkashaP/7081d9a6f1813b9e4c439f24b44684c9 to your computer and use it in GitHub Desktop.
POSIX message queues for Common Lisp.
;; This is free and unencumbered software released into the public domain.
(asdf:defsystem :mqueue
:name "mqueue"
:description "POSIX message queues for Common Lisp."
:version "0.0.0"
:author "Arto Bendiken <arto@datagraph.org> modified by Akasha"
:depends-on (:cffi)
:serial t
:components ((:file "mqueue")))
;; SOURCE:: https://gist.github.com/artob/5994998
;; Tweaks to:
;; - work
;; - specify message sizing
;; - easy string sending wrapper
;; - should be thread safe
;; note: queues opened by this fork are all R/W. This is needed by #'flush.
(defpackage :mqueue
(:use :cl :cffi :cffi-sys #+sbcl :sb-alien)
(:export
:load-library
:unload-library
:unlink-queue
:open-queue
:close-queue
:send-message-raw
:receive-message-raw
:send-message
:receive-message
:f-message
:send-message-timed
:receive-message-timed
:flush
:send-message-raw-timed
:receive-message-raw-timed
:get-attributes
:set-attributes
:non-blocking-p
:set-non-blocking
:with-unblocked)
(:shadow :error))
(in-package :mqueue)
(defparameter *buffer* (list))
(eval-when (:compile-toplevel :load-toplevel :execute)
(define-foreign-library librt
(:unix (:or "librt.so.0" "librt.so"))
(:darwin (:or "librt.0.dylib" "librt.dylib"))
(t (:default "librt")))
;; 'thread-local buffers'
;; we have to do this because if multiple threads send/receive on the same queue,
;; they cannot share this buffer or they will step in on each other.
;; posix mqueue is specifically parallel compatible. no sense in degrading that here.
(if (loop for c in bordeaux-threads:*default-special-bindings*
if (and (listp c) (eq '*buffer* (car c)))
do (return nil)
finally (return t))
(pushnew (cons '*buffer* (list))
bordeaux-threads:*default-special-bindings*
:test #'equalp)))
;; #include <fcntl.h>
;; (defparameter +O_RDONLY+ 00000000) ;; Linux: /usr/include/asm-generic/fcntl.h
;; (defparameter +O_WRONLY+ 00000001) ;; Linux: /usr/include/asm-generic/fcntl.h
;; (defparameter +O_RDWR+ 00000002) ;; Linux: /usr/include/asm-generic/fcntl.h
;; (defparameter +O_CREAT+ 00000100) ;; Linux: /usr/include/asm-generic/fcntl.h
(defmacro errno ()
`(sb-alien:get-errno))
(defcfun ("strerror" %%strerror) :string (errnum :int))
(defctype descriptor :int) ;; NOTE: platform-specific type
(defcstruct timespec (seconds :long) (nanoseconds :long))
(define-condition error (cl:error) ())
(defcstruct mq_attr (mq_flags :long) (mq_maxmsg :long) (mq_msgsize :long) (mq_curmsgs :long))
(define-condition foreign-function-error (error)
((function :initarg :function :reader foreign-function-error-function)
(code :initarg :code :reader foreign-function-error-code)
(message :initarg :message :reader foreign-function-error-message))
(:report (lambda (condition stream)
(format stream "~A failed with error code ~A: ~A"
(foreign-function-error-function condition)
(foreign-function-error-code condition)
(foreign-function-error-message condition)))))
(define-condition unknown-pathname (foreign-function-error) ()) ;; ENOENT (2)
(define-condition bad-file-descriptor (foreign-function-error) ()) ;; EBADF (9)
(define-condition disallowed-access (foreign-function-error) ()) ;; EACCES (13)
(defun foreign-function-error (errno function-name &optional message)
(declare (type fixnum errno)
(type string function-name))
(cl:error (find-foreign-function-error-class errno)
:function function-name
:code errno
:message (or message (%%strerror errno))))
(defun find-foreign-function-error-class (errno)
(declare (type fixnum errno))
(case errno
(2 'unknown-pathname)
(9 'bad-file-descriptor)
(13 'disallowed-access)
(t 'foreign-function-error)))
(defmacro with-checked-ssize-result (cfun-name &rest body)
(let ((ssize-var (gensym))
(errno-var (gensym)))
`(let ((,ssize-var (progn ,@body)))
(declare (type integer ,ssize-var))
(if (>= ,ssize-var 0)
,ssize-var
(foreign-function-error (errno) ,cfun-name)))))
(defmacro with-checked-int-result (cfun-name &rest body)
`(with-checked-ssize-result ,cfun-name ,@body))
(defun load-library (&key path version debug features)
"Loads the POSIX message queue library.
Must be called before invoking any foreign functions in the library."
(declare (type boolean debug)
(type list features))
(load-foreign-library 'librt)
(values)) ;;; no meaningful return value
(defun unload-library ()
"Unloads the POSIX message queue library."
(close-foreign-library 'librt)
(values)) ;;; no meaningful return value
;; int mq_unlink(const char* name)
(defcfun ("mq_unlink" %%unlink) :int (name :string))
(defun unlink-queue (queue-name)
"Removes a message queue from the system."
(declare (type string queue-name))
(with-checked-int-result "mq_unlink"
(%%unlink queue-name))
(values)) ;;; no meaningful return value
(defun get-or-create-get-buffer (queue-descriptor &optional (size 8192))
(let ((c (assoc queue-descriptor *buffer*)))
(if c
(let ((res (cdr c)))
(if (< (length res) size)
(setf (cdr (assoc queue-descriptor *buffer*))
(make-array size :element-type '(unsigned-byte 8) :initial-element 0)))
res)
(let ((res (make-array size :element-type '(unsigned-byte 8) :initial-element 0)))
(defparameter *buffer* (list (cons queue-descriptor res)))
res))))
;; mqd_t mq_open(const char* name, int oflag, mode_t mode, struct mq_attr* attr)
(defcfun ("mq_open" %%open) descriptor (name :string) (oflag :int) (mode :int) (attr :pointer))
(defun open-queue (queue-name &optional nonblocking
;; NOTE 0 does not mean 'use system default'.
(max-msg-size-bytes 8192 p1)
(max-msgs 10 p2))
"Opens or creates a message queue."
(declare (type string queue-name))
(with-foreign-object (x '(:struct mq_attr))
(with-foreign-slots ((mq_maxmsg mq_msgsize) x
(:struct mq_attr))
(setf mq_maxmsg max-msgs
mq_msgsize max-msg-size-bytes)
(let ((res (%%open queue-name
;; (ecase (or direction :input)
;; (:input (logior flags +O_RDONLY+))
;; (:output (logior flags +O_WRONLY+))
;; (:io (logior flags +O_RDWR+)))
;; mode
(logior #o2 #o100 (if nonblocking #o4000 0))
(logior #o200 #o400 #o020 #o040)
(if (or p1 p2)
(mem-aptr x :struct)
(cffi:null-pointer)))))
(get-or-create-get-buffer res)
res))))
;; int mq_close(mqd_t mqdes)
(defcfun ("mq_close" %%close) :int (mqdes descriptor))
(defun close-queue (queue-descriptor)
"Closes a message queue descriptor."
(declare (type fixnum queue-descriptor))
(with-checked-int-result "mq_close"
(%%close queue-descriptor))
(values)) ;;; no meaningful return value
;; int mq_send(mqd_t mqdes, const char* msg_ptr, size_t msg_len, unsigned msg_prio)
(defcfun ("mq_send" %%send) :int (mqdes descriptor) (msg-ptr :pointer) (msg-len :ulong) (msg-prio :uint))
(defcfun ("mq_timedsend" %%timedsend) :int (mqdes descriptor) (msg-ptr (:pointer)) (msg-len :ulong)
(msg-prio :uint) (abs-timeout :pointer))
(defun send-message-raw (queue-descriptor message-pointer message-size &key (message-priority 0))
"Sends a message to a message queue."
(declare (type fixnum queue-descriptor message-size)
(type foreign-pointer message-pointer)
(type (or fixnum null) message-priority))
(with-pointer-to-vector-data (ptr (get-or-create-get-buffer queue-descriptor message-size))
(with-checked-int-result "mq_send"
(%%send queue-descriptor message-pointer message-size
message-priority)))
(values))
(defun send-message (queue-descriptor message &key (message-priority 0))
(declare (dynamic-extent message))
(with-foreign-string (x message)
(send-message-raw queue-descriptor x (length message) :message-priority message-priority)))
(defun send-message-raw-timed (queue-descriptor message-pointer message-size
&key (message-priority 0) timeout)
"Sends a message to a message queue."
(declare (type fixnum queue-descriptor message-size)
(type foreign-pointer message-pointer)
(type (or fixnum null) message-priority)
(type fixnum timeout))
(multiple-value-bind (secs mils) (floor timeout)
(with-foreign-object (tim '(:struct timespec))
(with-foreign-slots ((seconds nanoseconds) tim (:struct timespec))
(setf seconds (+ (sb-ext:get-time-of-day) secs) nanoseconds (* mils 1000000)))
(with-checked-int-result "mq_timedsend"
(%%timedsend queue-descriptor message-pointer message-size
message-priority tim))))
;; (values)
) ;;; no meaningful return value
(defun send-message-timed (queue-descriptor message
&key (message-priority 0) timeout)
(declare (dynamic-extent message)
(fixnum message-priority)
(string message))
(with-foreign-string (x message)
(send-message-raw-timed queue-descriptor x (length message) :message-priority message-priority :timeout timeout)))
;; ssize_t mq_receive(mqd_t mqdes, char* msg_ptr, size_t msg_len, unsigned* msg_prio)
(defcfun ("mq_receive" %%receive) :long (mqdes descriptor) (msg-ptr :pointer) (msg-len :ulong) (msg-prio :pointer))
(defcfun ("mq_timedreceive" %%timedreceive) :long (mqdes descriptor) (msg-ptr (:pointer))
(msg-len :ulong) (msg-prio :pointer) (abs-timeout :pointer))
(defun receive-message-raw (queue-descriptor message-pointer message-size)
"Receives a message from a message queue.
Returns the number of bytes in the received message."
(declare (type fixnum queue-descriptor message-size)
(type foreign-pointer message-pointer))
(with-foreign-object (priority :uint)
(with-checked-ssize-result "mq_receive"
(values (%%receive queue-descriptor message-pointer message-size priority)
(mem-aref priority :uint)))))
(defun receive-message (queue-descriptor)
(declare (type fixnum queue-descriptor))
(let ((buf (get-or-create-get-buffer queue-descriptor)))
(with-pointer-to-vector-data (ptr buf)
(multiple-value-bind (length prio)
(receive-message-raw queue-descriptor ptr (length buf))
(values (babel:octets-to-string buf :end length) prio)))))
(defun receive-message-raw-timed (queue-descriptor message-pointer message-size
&key timeout)
"Receives a message from a message queue.
Returns the number of bytes in the received message."
(declare (type fixnum queue-descriptor message-size)
(type foreign-pointer message-pointer))
(multiple-value-bind (secs mils) (floor timeout)
(with-foreign-object (tim '(:struct timespec))
(with-foreign-slots ((seconds nanoseconds) tim (:struct timespec))
(setf seconds (+ (serapeum:get-unix-time) secs) nanoseconds (* mils 1000000)))
(with-foreign-object (prio :uint)
(with-checked-ssize-result "mq_timedreceive"
(values (%%timedreceive queue-descriptor message-pointer message-size prio tim)
(mem-aref prio :uint)))))))
(defun receive-message-timed (queue-descriptor message-size
&key timeout)
"Receives a message from a message queue.
Returns the number of bytes in the received message."
(declare (type fixnum queue-descriptor message-size))
(let ((buf (get-or-create-get-buffer queue-descriptor message-size)))
(with-pointer-to-vector-data (ptr buf)
(multiple-value-bind (status prio)
(receive-message-raw-timed queue-descriptor ptr message-size timeout)
(values (octets-to-string buf) prio)))))
(defcfun ("mq_getattr" %%getattr) :long
""
(mqdes descriptor)
(attr (:pointer (:struct mq_attr))))
(defun get-attributes (queue-descriptor)
(with-foreign-object (x '(:struct mq_attr))
(%%getattr queue-descriptor x)
(with-foreign-slots ((mq_flags mq_maxmsg mq_msgsize mq_curmsgs) x
(:struct mq_attr))
(values mq_flags mq_maxmsg mq_msgsize mq_curmsgs))))
(defun non-blocking-p (queue-descriptor)
(eq #o4000 (mqueue::get-attributes 35)))
(defcfun ("mq_setattr" %%setattr) :long
""
(mqdes descriptor)
(attr (:pointer (:struct mq_attr)))
(oattr (:pointer (:struct mq_attr))))
;; Note: as per the manual, the only thing that can be changed
;; is the non-blocking flag
(defun set-attributes (queue-descriptor flags)
(with-foreign-object (x '(:struct mq_attr))
(with-foreign-object (y '(:struct mq_attr))
(with-foreign-slots ((mq_flags) x (:struct mq_attr))
(setf mq_flags flags)
(%%setattr queue-descriptor x y)))))
(defun set-non-blocking (queue-descriptor non-blocking)
(set-attributes queue-descriptor (if non-blocking #o4000 0)))
(defmacro with-unblocked (queue-descriptor &body body)
(let ((wasnt-blocked (gensym))
(res (gensym)))
`(let ((,wasnt-blocked (non-blocking-p queue-descriptor))
(,res nil))
(unless ,wasnt-blocked
(set-non-blocking queue-descriptor nil))
(let ((,res (progn ,@body)))
(unless ,wasnt-blocked
(set-non-blocking queue-descriptor t))
,res))))
(defun flush (queue-descriptor)
"Small hack function to clear old messages.
Messages are persistent unless ALL programs are disconnected from the queue,
no matter what you did."
(with-unblocked queue-descriptor
(handler-case (loop do (receive-message queue-descriptor))
(foreign-function-error () t))))
(defun f-message (queue-descriptor message)
"Like send-message, but if the queue is full, forces this message onto it"
(multiple-value-bind (mq_flags mq_maxmsg mq_msgsize mq_curmsgs)
(get-attributes queue-descriptor)
(if (>= mq_curmsgs mq_maxmsg)
(with-unblocked queue-descriptor
(receive-message queue-descriptor)))
(send-message queue-descriptor message)))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment