Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
actor
(in-package :actor)
(defparameter *current-process* (make-process :background nil))
(define-symbol-macro self *current-process*)
;; fork
(defun fork-impl (fn)
(let ((proc (make-process)))
(execute-fork-fn fn proc) ; forkしたプロセスを実行する
proc))
(defmacro fork (&body body)
`(fork-impl (lambda () ,@body)))
;; send
(defun send (message destination)
(with-slots (fn mailbox lock) (the process destination)
(sb-thread:with-mutex (lock :wait-p t) ; ロック
(if fn
;; a] プロセスがreceive待ちの状態なら、すぐに実行する
(let ((receive-fn fn))
(setf fn nil)
(execute-receive-fn receive-fn message destination))
;; b] receive待ちでないなら、メールボックスに追加
(lock-free-queue:enq message mailbox))))
(values))
;; receive
(defun receive-repl (receive-fn) ; REPL用
(with-slots (fn mailbox) *current-process*
;; メッセージを受信するまでブロックする
(loop WHILE (lock-free-queue:empty-p mailbox)
DO (sleep 0.01)
FINALLY
(funcall receive-fn (lock-free-queue:deq mailbox)))))
(defun receive-background (receive-fn) ; バックグラウンドプロセス用
(with-slots (fn mailbox lock) *current-process*
(sb-thread:with-mutex (lock :wait-p t) ; ロック
(if (lock-free-queue:empty-p mailbox)
;; a] メールボックスが空なら、receive待ちの状態に設定する
(setf fn receive-fn)
;; b] 利用可能なメッセージがあるなら、すぐに実行する
(let ((message (lock-free-queue:deq mailbox)))
(execute-receive-fn receive-fn message *current-process*)))))
(values))
(defun receive-impl (receive-fn)
(if (process-background *current-process*)
(receive-background receive-fn)
(receive-repl receive-fn)))
(defmacro receive (message &body body)
`(receive-impl
(lambda (,message)
,@body)))
(defpackage actor
(:use :common-lisp)
(:export process
self
fork
receive
send))
(in-package :actor)
(in-package :actor)
(defstruct process
(fn nil :type (or null function))
(mailbox (lock-free-queue:make) :type lock-free-queue:queue) ; https://gist.github.com/3253404
(background t :type boolean)
(lock (sb-thread:make-mutex)))
(defmethod print-object ((o process) stream)
(print-unreadable-object (o stream :type t :identity t)))
%% http://rainyday.blog.so-net.ne.jp/2007-05-20
-module(ring).
-export([start/2, make_ring/3]).
start(N,M) ->
statistics(runtime),
statistics(wall_clock),
First = spawn(?MODULE, make_ring, [N-1, M, self()]),
receive
{Last} -> Last ! {First} end,
First ! message,
receive
{Last} -> ok end,
{_,Time1} = statistics(runtime),
{_,Time2} = statistics(wall_clock),
.io:format("N = ~p, M = ~p; elapsed time = ~p (~p) miliseconds~n", [N, M, Time1, Time2]).
make_ring(0,M,Top) ->
Top ! {self()},
receive
{Neighbor} -> ok end,
loop(Neighbor, M),
Top ! {self()};
make_ring(N,M,Top) ->
Neighbor = spawn(?MODULE, make_ring, [N-1, M, Top]),
loop(Neighbor, M).
loop(_,0) -> ok;
loop(Neighbor,M) ->
% .io:format("Pid = ~p, M = ~p~n", [self(), M]),
receive
message -> Neighbor ! message end,
loop(Neighbor, M-1).
(defun relay (neighbor)
(labels ((recur ()
(actor:receive message
;;(print (list actor:self neighbor))
;; (sleep 10)
(actor:send message neighbor)
(recur))))
(recur)))
(defun make-ring-processes (proc-num &aux (parent actor:self))
(loop REPEAT proc-num
WITH neighbor = parent
DO
(let ((next neighbor))
(setf neighbor (actor:fork (relay next))))
FINALLY (return neighbor)))
(time
(let ((first (make-ring-processes 1000)))
(loop FOR i FROM 0 BELOW 1000
DO (actor:send (list :message i) first))
(loop REPEAT 1000
DO (actor:receive message
(print message)))))
(actor:fork
(actor:receive (from message)
(print message)
(actor:send message from)))
(let ((neighbor actor:self))
(actor:fork (relay neighbor)))
(actor:fork
(actor:receive msg
(print (list 10 msg))))
(defparameter *procs*
(loop REPEAT 1000000
COLLECT (actor:fork (actor:receive msg))))
(in-package :actor)
(defparameter *queue* (lock-free-queue:make))
(defparameter *worker-num* 8)
(defparameter *workers*
(loop REPEAT *worker-num*
COLLECT (sb-thread:make-thread
(lambda ()
(loop
(multiple-value-bind (fn ok?) (lock-free-queue:deq *queue*)
(if ok?
(funcall fn)
(sleep 0.01))))))))
(defun execute-fork-fn (fn process)
(lock-free-queue:enq (lambda ()
(let ((*current-process* process))
(funcall fn)))
*queue*))
(defun execute-receive-fn (fn message process)
(lock-free-queue:enq (lambda ()
(let ((*current-process* process))
(funcall fn message)))
*queue*))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment