Skip to content

Instantly share code, notes, and snippets.

@death
Last active December 19, 2017 19:37
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save death/9ebfb1ae524c9c3dcdf780762ce0362d to your computer and use it in GitHub Desktop.
Save death/9ebfb1ae524c9c3dcdf780762ce0362d to your computer and use it in GitHub Desktop.
;; Inspired by
;; https://mailman.common-lisp.net/pipermail/pro/2016-July/001390.html
(defpackage #:snippets/processes
(:use #:cl)
(:import-from #:alexandria #:shuffle))
(in-package #:snippets/processes)
(defvar *pending-processes* nil
"A list of processes waiting to execute.")
(defvar *pending-messages* nil
"A mapping between process names and their respective lists of
pending messages.")
(defclass process ()
((name :initarg :name :reader process-name)
(message-handler :initarg :message-handler :reader process-message-handler)))
(defgeneric handle-message (process message)
(:documentation "Tell the process to handle a message.")
(:method ((process process) message)
(funcall (process-message-handler process) message)))
(defun receive (process)
"If a process has messages incoming, return one of them; return NIL
otherwise."
(pop (gethash (process-name process) *pending-messages*)))
(defclass message ()
((sender :initarg :sender :accessor message-sender)
(payload :initarg :payload :accessor message-payload)))
(defmethod message-payload ((object null)) nil)
(defvar *current-process* nil
"The currently running process.")
(defvar *message-source* nil
"The process sending the name.")
(defun post (process-designator message)
"Post a message to the designated process."
(push (make-instance 'message
:sender (and *current-process* (process-name *current-process*))
:payload message)
(gethash (if (typep process-designator 'process)
(process-name process-designator)
process-designator)
*pending-messages*)))
(defun respond (response)
"Post a response to the originator of a message."
(post *message-source* response))
(defun scheduler (&key initial)
"Be the machine."
(let ((*pending-processes* (copy-list initial))
(*pending-messages* (make-hash-table :test 'eq)))
(catch 'boom
(loop
(let ((agenda (shuffle (copy-list *pending-processes*))))
(setf *pending-processes* '())
(if (null agenda)
(return)
(dolist (*current-process* agenda)
(let ((message (or (receive *current-process*)
(make-instance 'message
:sender 'scheduler
:payload nil))))
(let ((*message-source* (message-sender message)))
(handle-message *current-process* (message-payload message)))))))))))
(defun schedule (process)
"Add the process to the list of pending processes."
(push process *pending-processes*))
(defun spawn (process-name &rest args)
"Schedule a process with the supplied name to run."
(schedule (apply process-name args)))
(defun spawn-one (process-name)
"Make sure a process with the supplied name is running; do not spawn
one if not necessary."
(unless (member process-name *pending-processes* :key #'process-name)
(spawn process-name)))
(defun respawn ()
"Schedule the current process to run again."
(schedule *current-process*))
(defun halt ()
"Halt the scheduler."
(throw 'boom nil))
(defun exit ()
"Exit the currently running process."
(error "There is no currently running process."))
(defmacro defprocess (name (&rest state-vars) (&optional message-var) &body forms)
"Define a process."
(let ((first-time (gensym))
(docstring (and (stringp (car forms)) (pop forms))))
(when (null message-var)
(setf message-var (gensym)))
`(progn
(defun ,name (&key ,@state-vars)
,@(when docstring (list docstring))
(make-instance 'process
:name ',name
:message-handler
(let ((,first-time t))
(lambda (,message-var)
(declare (ignorable ,message-var))
(block ,name
(flet ((exit ()
(return-from ,name)))
(declare (ignorable #'exit))
(macrolet ((initially (&body body)
`(when ,',first-time
,@body)))
,@forms)
(when ,first-time
(setf ,first-time nil))
(respawn))))))))))
(defprocess add-client ((num-requests 0)) (res)
"Spawns an add server and requests solutions of it."
(initially (spawn-one 'add-server))
(cond ((null res)
(post 'add-server (list (random 10) (random 10)))
(when (>= (incf num-requests) 10)
(halt)))
(t (format t "~A~%" res))))
(defprocess add-server () (msg)
"Takes two numbers and responds with a list of them and their sum."
(when msg
(destructuring-bind (a b) msg
(respond (list a b (+ a b))))))
(defun add-client-example ()
"Example of simple client-server communication."
(scheduler :initial (list (add-client))))
(defprocess ping () (ball)
"Send a ball to pong."
(initially (setf ball 'ball))
(when ball
(print 'ping)
(post 'pong ball)
(setf ball nil)))
(defprocess pong () (ball)
"Send a ball to ping."
(when ball
(print 'pong)
(post 'ping ball)
(setf ball nil)))
(defprocess timebomb ((timer 10)) ()
"Halt when a decrementing timer reaches zero."
(when (<= (decf timer) 0)
(halt)))
(defun ping-pong-example ()
"Example of peer communication."
(scheduler :initial (list (ping) (pong) (timebomb))))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment