Skip to content

Instantly share code, notes, and snippets.

@Goheeca
Last active February 21, 2021 10:06
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Goheeca/d91e62fe50fcd798a8176fe6a38aca3b to your computer and use it in GitHub Desktop.
Save Goheeca/d91e62fe50fcd798a8176fe6a38aca3b to your computer and use it in GitHub Desktop.
Pausable Threads
#!/usr/bin/sbcl --script
(load "~/.sbclrc")
(ql:quickload '("bordeaux-threads") :silent t)
(defun color-formatter (color)
(format nil "~a[~am~~?~a[m" #\Esc color #\Esc))
(defvar *output-lock* (bt:make-lock "OUTPUT LOCK"))
(defmacro safe-format (stream fmt &rest args)
`(bt:with-lock-held (*output-lock*)
(format ,stream ,fmt ,@args)
(force-output)))
(defvar *progress-monitor* nil)
(defvar *current-pausable-thread* nil)
(defclass synchronized-value ()
((sv-name
:initarg :name
:accessor sv-name)
(sv-value
:initarg :value
:accessor sv-value)
(sv-guard-lock
:initform nil
:accessor sv-guard-lock)))
(defmethod initialize-instance :after ((sv synchronized-value) &key)
(setf (sv-guard-lock sv) (bt:make-recursive-lock (concatenate 'string "guard lock of " (sv-name sv)))))
(defun make-synchronized-value (value name)
(make-instance 'synchronized-value :value value :name name))
(defmacro with-synchronized-value ((sv) &body body)
`(bt:with-recursive-lock-held ((sv-guard-lock ,sv))
,@body))
(defclass pausable-thread ()
((name
:initarg :name
:accessor name)
(function
:initarg :thread-function
:accessor thread-function)
(thread
:initform nil
:accessor thread)
(thread-lock
:initform nil
:accessor thread-lock)
(signal-lock
:initform nil
:accessor signal-lock)
(paused
:initform nil
:accessor paused)
(pausable
:initform t
:accessor pausable)
(pausable-condition
:initform nil
:accessor pausable-condition)
(pausing
:initform nil
:accessor pausing)
(pausing-condition
:initform nil
:accessor pausing-condition)
(unpausing
:initform nil
:accessor unpausing)
(unpausing-condition
:initform nil
:accessor unpausing-condition)
(color
:initarg :color
:initform ""
:accessor color)))
(defmethod initialize-instance :after ((pt pausable-thread) &key)
(setf (thread-lock pt) (bt:make-recursive-lock (concatenate 'string "thread lock of " (name pt))))
(setf (signal-lock pt) (bt:make-lock (concatenate 'string "signal lock of " (name pt))))
(setf (pausable-condition pt)
(bt:make-condition-variable :name (concatenate 'string "pausable condition of " (name pt))))
(setf (pausing-condition pt)
(bt:make-condition-variable :name (concatenate 'string "pausing condition of " (name pt))))
(setf (unpausing-condition pt)
(bt:make-condition-variable :name (concatenate 'string "unpausing condition of " (name pt))))
(setf (thread pt) (bt:make-thread #'(lambda ()
(let ((*current-pausable-thread* pt))
(funcall (thread-function pt))))
:name (name pt))))
(defun make-pausable-thread (function name &key color)
(make-instance 'pausable-thread :thread-function function :name name :color color))
(defun join-pausable-thread (pt)
(bt:join-thread (thread pt)))
(defun thread-formatting ()
(color-formatter (if *current-pausable-thread*
(color *current-pausable-thread*)
"31;1")))
(defmacro wait%% (type (condition test lock) &body body)
`(,type (,lock)
(loop until ,test
do (bt:condition-wait ,condition ,lock))
,@body))
(defmacro wait% ((condition test lock) &body body)
`(wait%% bt:with-lock-held (,condition ,test ,lock) ,@body))
(defmacro wait-pausable ((pt) &body body)
`(wait%% bt:with-recursive-lock-held ((pausable-condition ,pt) (pausable ,pt) (thread-lock ,pt))
,@body))
(defmacro notify%% (type (condition test lock) &body body)
`(,type (,lock)
(setf ,test (progn ,@body))
(bt:condition-notify ,condition)))
(defmacro notify% ((condition test lock) &body body)
`(notify%% bt:with-lock-held (,condition ,test ,lock) ,@body))
(defun unpause (pt)
(notify% ((unpausing-condition pt) (unpausing pt) (signal-lock pt)) t))
(defun set-pausable (val)
(notify%% bt:with-recursive-lock-held ((pausable-condition *current-pausable-thread*) (pausable *current-pausable-thread*) (thread-lock *current-pausable-thread*)) val))
(defmacro pause%% ((condition test lock) (&body pre-pause) (&body post-pause))
`(progn
,@pre-pause
(wait% (,condition ,test ,lock)
,@post-pause)))
(defun pause% ()
(pause%% ((unpausing-condition *current-pausable-thread*) (unpausing *current-pausable-thread*) (signal-lock *current-pausable-thread*))
((safe-format t "~?" (thread-formatting) `("~a" ("Pausing.")))
(setf (paused *current-pausable-thread*) t)
(notify% ((pausing-condition *current-pausable-thread*) (pausing *current-pausable-thread*) (signal-lock *current-pausable-thread*)) t))
((setf (unpausing *current-pausable-thread*) nil)
(safe-format t "~?" (thread-formatting) `("~a" ("Unpausing.")))
(setf (paused *current-pausable-thread*) nil))))
(defmacro with-paused-p ((var pt) &body body)
`(bt:with-recursive-lock-held ((thread-lock ,pt))
(let ((,var (paused ,pt)))
,@body)))
(defun pause (pt)
(bt:interrupt-thread (thread pt) #'pause%)
(wait% ((pausing-condition pt) (pausing pt) (signal-lock pt))
(setf (pausing pt) nil)))
(defun hello ()
(safe-format t "~&Hello from ~?.~%" (thread-formatting) `("~a" (,(bt:thread-name (bt:current-thread))))))
(defun status (count)
(safe-format t "~?" (thread-formatting) `("~a." (,count))))
(defmacro unpausable (&body body)
`(unwind-protect
(progn
(set-pausable nil)
,@body)
(set-pausable t)))
(set-dispatch-macro-character #\# #\! #'(lambda (s c a)
(declare (ignore c a))
`(unpausable ,(read s t nil t))))
(defun worker (&optional (delay 1))
(let ((i 0))
(hello)
(loop
(sleep delay)
#!(with-synchronized-value (*progress-monitor*)
(setf (gethash (name *current-pausable-thread*) (sv-value *progress-monitor*)) i)
(status i))
(incf i))))
(defun main ()
(let ((workers (list
(make-pausable-thread #'(lambda () (worker 0.01)) "worker A" :color "32;1")
(make-pausable-thread #'(lambda () (worker 0.02)) "worker B" :color "33;1")
(make-pausable-thread #'(lambda () (worker 0.015)) "worker C" :color "34;1"))))
(setf *progress-monitor* (make-synchronized-value (make-hash-table) "progress monitor"))
(loop for worker in workers
do (setf (gethash (name worker) (sv-value *progress-monitor*)) nil))
(hello)
(loop
(sleep (/ (random 10) 10))
(let ((who (nth (random (length workers)) workers)))
(with-paused-p (is-paused who)
(if is-paused
(progn
(safe-format t "~?" (thread-formatting) `("Unpause ~a." (,(name who))))
(unpause who))
(progn
(safe-format t "~?" (thread-formatting) `("Pause ~a." (,(name who))))
(with-synchronized-value (*progress-monitor*)
(safe-format t "~?" (thread-formatting)
`("~{~{~a~^: ~}~^, ~}." (,(loop for k being the hash-key using (hash-value v) of (sv-value *progress-monitor*) collect `(,k ,v))))))
(wait-pausable (who)
(pause who)))))))
(loop for worker in workers do (join-pausable-thread worker))))
(handler-case
(main)
(sb-sys:interactive-interrupt () (fresh-line) (sb-ext:exit)))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment