Last active
December 19, 2017 19:37
-
-
Save death/9ebfb1ae524c9c3dcdf780762ce0362d to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
;; Inspired by | |
;; https://mailman.common-lisp.net/pipermail/pro/2016-July/001390.html | |
(defpackage #:snippets/processes | |
(:use #:cl) | |
(:import-from #:alexandria #:shuffle)) | |
(in-package #:snippets/processes) | |
(defvar *pending-processes* nil | |
"A list of processes waiting to execute.") | |
(defvar *pending-messages* nil | |
"A mapping between process names and their respective lists of | |
pending messages.") | |
(defclass process () | |
((name :initarg :name :reader process-name) | |
(message-handler :initarg :message-handler :reader process-message-handler))) | |
(defgeneric handle-message (process message) | |
(:documentation "Tell the process to handle a message.") | |
(:method ((process process) message) | |
(funcall (process-message-handler process) message))) | |
(defun receive (process) | |
"If a process has messages incoming, return one of them; return NIL | |
otherwise." | |
(pop (gethash (process-name process) *pending-messages*))) | |
(defclass message () | |
((sender :initarg :sender :accessor message-sender) | |
(payload :initarg :payload :accessor message-payload))) | |
(defmethod message-payload ((object null)) nil) | |
(defvar *current-process* nil | |
"The currently running process.") | |
(defvar *message-source* nil | |
"The process sending the name.") | |
(defun post (process-designator message) | |
"Post a message to the designated process." | |
(push (make-instance 'message | |
:sender (and *current-process* (process-name *current-process*)) | |
:payload message) | |
(gethash (if (typep process-designator 'process) | |
(process-name process-designator) | |
process-designator) | |
*pending-messages*))) | |
(defun respond (response) | |
"Post a response to the originator of a message." | |
(post *message-source* response)) | |
(defun scheduler (&key initial) | |
"Be the machine." | |
(let ((*pending-processes* (copy-list initial)) | |
(*pending-messages* (make-hash-table :test 'eq))) | |
(catch 'boom | |
(loop | |
(let ((agenda (shuffle (copy-list *pending-processes*)))) | |
(setf *pending-processes* '()) | |
(if (null agenda) | |
(return) | |
(dolist (*current-process* agenda) | |
(let ((message (or (receive *current-process*) | |
(make-instance 'message | |
:sender 'scheduler | |
:payload nil)))) | |
(let ((*message-source* (message-sender message))) | |
(handle-message *current-process* (message-payload message))))))))))) | |
(defun schedule (process) | |
"Add the process to the list of pending processes." | |
(push process *pending-processes*)) | |
(defun spawn (process-name &rest args) | |
"Schedule a process with the supplied name to run." | |
(schedule (apply process-name args))) | |
(defun spawn-one (process-name) | |
"Make sure a process with the supplied name is running; do not spawn | |
one if not necessary." | |
(unless (member process-name *pending-processes* :key #'process-name) | |
(spawn process-name))) | |
(defun respawn () | |
"Schedule the current process to run again." | |
(schedule *current-process*)) | |
(defun halt () | |
"Halt the scheduler." | |
(throw 'boom nil)) | |
(defun exit () | |
"Exit the currently running process." | |
(error "There is no currently running process.")) | |
(defmacro defprocess (name (&rest state-vars) (&optional message-var) &body forms) | |
"Define a process." | |
(let ((first-time (gensym)) | |
(docstring (and (stringp (car forms)) (pop forms)))) | |
(when (null message-var) | |
(setf message-var (gensym))) | |
`(progn | |
(defun ,name (&key ,@state-vars) | |
,@(when docstring (list docstring)) | |
(make-instance 'process | |
:name ',name | |
:message-handler | |
(let ((,first-time t)) | |
(lambda (,message-var) | |
(declare (ignorable ,message-var)) | |
(block ,name | |
(flet ((exit () | |
(return-from ,name))) | |
(declare (ignorable #'exit)) | |
(macrolet ((initially (&body body) | |
`(when ,',first-time | |
,@body))) | |
,@forms) | |
(when ,first-time | |
(setf ,first-time nil)) | |
(respawn)))))))))) | |
(defprocess add-client ((num-requests 0)) (res) | |
"Spawns an add server and requests solutions of it." | |
(initially (spawn-one 'add-server)) | |
(cond ((null res) | |
(post 'add-server (list (random 10) (random 10))) | |
(when (>= (incf num-requests) 10) | |
(halt))) | |
(t (format t "~A~%" res)))) | |
(defprocess add-server () (msg) | |
"Takes two numbers and responds with a list of them and their sum." | |
(when msg | |
(destructuring-bind (a b) msg | |
(respond (list a b (+ a b)))))) | |
(defun add-client-example () | |
"Example of simple client-server communication." | |
(scheduler :initial (list (add-client)))) | |
(defprocess ping () (ball) | |
"Send a ball to pong." | |
(initially (setf ball 'ball)) | |
(when ball | |
(print 'ping) | |
(post 'pong ball) | |
(setf ball nil))) | |
(defprocess pong () (ball) | |
"Send a ball to ping." | |
(when ball | |
(print 'pong) | |
(post 'ping ball) | |
(setf ball nil))) | |
(defprocess timebomb ((timer 10)) () | |
"Halt when a decrementing timer reaches zero." | |
(when (<= (decf timer) 0) | |
(halt))) | |
(defun ping-pong-example () | |
"Example of peer communication." | |
(scheduler :initial (list (ping) (pong) (timebomb)))) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment