Created
August 4, 2012 01:23
-
-
Save sile/3253290 to your computer and use it in GitHub Desktop.
actor
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(in-package :actor) | |
(defparameter *current-process* (make-process :background nil)) | |
(define-symbol-macro self *current-process*) | |
;; fork | |
(defun fork-impl (fn) | |
(let ((proc (make-process))) | |
(execute-fork-fn fn proc) ; forkしたプロセスを実行する | |
proc)) | |
(defmacro fork (&body body) | |
`(fork-impl (lambda () ,@body))) | |
;; send | |
(defun send (message destination) | |
(with-slots (fn mailbox lock) (the process destination) | |
(sb-thread:with-mutex (lock :wait-p t) ; ロック | |
(if fn | |
;; a] プロセスがreceive待ちの状態なら、すぐに実行する | |
(let ((receive-fn fn)) | |
(setf fn nil) | |
(execute-receive-fn receive-fn message destination)) | |
;; b] receive待ちでないなら、メールボックスに追加 | |
(lock-free-queue:enq message mailbox)))) | |
(values)) | |
;; receive | |
(defun receive-repl (receive-fn) ; REPL用 | |
(with-slots (fn mailbox) *current-process* | |
;; メッセージを受信するまでブロックする | |
(loop WHILE (lock-free-queue:empty-p mailbox) | |
DO (sleep 0.01) | |
FINALLY | |
(funcall receive-fn (lock-free-queue:deq mailbox))))) | |
(defun receive-background (receive-fn) ; バックグラウンドプロセス用 | |
(with-slots (fn mailbox lock) *current-process* | |
(sb-thread:with-mutex (lock :wait-p t) ; ロック | |
(if (lock-free-queue:empty-p mailbox) | |
;; a] メールボックスが空なら、receive待ちの状態に設定する | |
(setf fn receive-fn) | |
;; b] 利用可能なメッセージがあるなら、すぐに実行する | |
(let ((message (lock-free-queue:deq mailbox))) | |
(execute-receive-fn receive-fn message *current-process*))))) | |
(values)) | |
(defun receive-impl (receive-fn) | |
(if (process-background *current-process*) | |
(receive-background receive-fn) | |
(receive-repl receive-fn))) | |
(defmacro receive (message &body body) | |
`(receive-impl | |
(lambda (,message) | |
,@body))) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(defpackage actor | |
(:use :common-lisp) | |
(:export process | |
self | |
fork | |
receive | |
send)) | |
(in-package :actor) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(in-package :actor) | |
(defstruct process | |
(fn nil :type (or null function)) | |
(mailbox (lock-free-queue:make) :type lock-free-queue:queue) ; https://gist.github.com/3253404 | |
(background t :type boolean) | |
(lock (sb-thread:make-mutex))) | |
(defmethod print-object ((o process) stream) | |
(print-unreadable-object (o stream :type t :identity t))) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
%% http://rainyday.blog.so-net.ne.jp/2007-05-20 | |
-module(ring). | |
-export([start/2, make_ring/3]). | |
start(N,M) -> | |
statistics(runtime), | |
statistics(wall_clock), | |
First = spawn(?MODULE, make_ring, [N-1, M, self()]), | |
receive | |
{Last} -> Last ! {First} end, | |
First ! message, | |
receive | |
{Last} -> ok end, | |
{_,Time1} = statistics(runtime), | |
{_,Time2} = statistics(wall_clock), | |
.io:format("N = ~p, M = ~p; elapsed time = ~p (~p) miliseconds~n", [N, M, Time1, Time2]). | |
make_ring(0,M,Top) -> | |
Top ! {self()}, | |
receive | |
{Neighbor} -> ok end, | |
loop(Neighbor, M), | |
Top ! {self()}; | |
make_ring(N,M,Top) -> | |
Neighbor = spawn(?MODULE, make_ring, [N-1, M, Top]), | |
loop(Neighbor, M). | |
loop(_,0) -> ok; | |
loop(Neighbor,M) -> | |
% .io:format("Pid = ~p, M = ~p~n", [self(), M]), | |
receive | |
message -> Neighbor ! message end, | |
loop(Neighbor, M-1). |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(defun relay (neighbor) | |
(labels ((recur () | |
(actor:receive message | |
;;(print (list actor:self neighbor)) | |
;; (sleep 10) | |
(actor:send message neighbor) | |
(recur)))) | |
(recur))) | |
(defun make-ring-processes (proc-num &aux (parent actor:self)) | |
(loop REPEAT proc-num | |
WITH neighbor = parent | |
DO | |
(let ((next neighbor)) | |
(setf neighbor (actor:fork (relay next)))) | |
FINALLY (return neighbor))) | |
(time | |
(let ((first (make-ring-processes 1000))) | |
(loop FOR i FROM 0 BELOW 1000 | |
DO (actor:send (list :message i) first)) | |
(loop REPEAT 1000 | |
DO (actor:receive message | |
(print message))))) | |
(actor:fork | |
(actor:receive (from message) | |
(print message) | |
(actor:send message from))) | |
(let ((neighbor actor:self)) | |
(actor:fork (relay neighbor))) | |
(actor:fork | |
(actor:receive msg | |
(print (list 10 msg)))) | |
(defparameter *procs* | |
(loop REPEAT 1000000 | |
COLLECT (actor:fork (actor:receive msg)))) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(in-package :actor) | |
(defparameter *queue* (lock-free-queue:make)) | |
(defparameter *worker-num* 8) | |
(defparameter *workers* | |
(loop REPEAT *worker-num* | |
COLLECT (sb-thread:make-thread | |
(lambda () | |
(loop | |
(multiple-value-bind (fn ok?) (lock-free-queue:deq *queue*) | |
(if ok? | |
(funcall fn) | |
(sleep 0.01)))))))) | |
(defun execute-fork-fn (fn process) | |
(lock-free-queue:enq (lambda () | |
(let ((*current-process* process)) | |
(funcall fn))) | |
*queue*)) | |
(defun execute-receive-fn (fn message process) | |
(lock-free-queue:enq (lambda () | |
(let ((*current-process* process)) | |
(funcall fn message))) | |
*queue*)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment