Skip to content

Instantly share code, notes, and snippets.

@mopemope
Created April 22, 2015 06:36
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mopemope/a0e9878a4ef9a2ea2310 to your computer and use it in GitHub Desktop.
Save mopemope/a0e9878a4ef9a2ea2310 to your computer and use it in GitHub Desktop.
ppool
(defmodule ppool-nagger
(export all)
(behavior gen_server))
(defun start-link (task delay max send-to)
(gen_server:start_link (MODULE) `#(,task ,delay ,max ,send-to) '()))
(defun start_link (task delay max send-to)
(start-link task delay max send-to))
(defun stop (pid)
(gen_server:call pid 'stop))
(defun init
((`#(,task ,delay ,max ,send-to))
`#(ok #(,task ,delay ,max ,send-to) ,delay)))
(defun handle_call
(('stop _from state)
`#(stop normal ok ,state))
((_msg _from state)
`#(noreply ,state)))
(defun handle_cast (_msg state)
`#(noreply ,state))
(defun handle_info
(('timeout `#(,task ,delay ,max ,send-to))
(! send-to `#(,(self) ,task))
(cond ((=:= max 'infinity)
`#(noreply #(,task ,delay ,max ,send-to) ,delay))
((=< max 1)
`#(stop normal #(,task ,delay 0 ,send-to)))
((> max 1)
`#(noreply #(,task ,delay ,(- max 1) ,send-to) ,delay )))))
(defun code_change (_oldvsn state _extra)
`#(ok ,state))
(defun terminate (_reason _state)
'ok)
(defmodule ppool-serv
(export all)
(behavior gen_server))
(defun start
((name limit sup mfa) (when (and (is_atom name) (is_integer limit)))
(gen_server:start `#(local ,name) (MODULE) `#(,limit ,mfa ,sup) '())))
(defun start-link
((name limit sup mfa) (when (and (is_atom name) (is_integer limit)))
(gen_server:start_link `#(local ,name) (MODULE) `#(,limit ,mfa ,sup) '())))
(defun start_link (name limit sup mfa)
(start-link name limit sup mfa))
(defun run (name args)
(gen_server:call name `#(run ,args)))
(defun sync-queue (name args)
(gen_server:call name `#(sync ,args) 'infinity))
(defun sync_queue (name args)
(sync-queue name args))
(defun async-queue (name args)
(gen_server:cast name `#(async ,args)))
(defun async_queue (name args)
(async-queue name args))
(defun stop (name)
(gen_server:call name 'stop))
(defun spec (mfa)
`#(worker_sup
#(ppool-worker-sup start_link (,mfa))
temporary 10000 supervisor (ppool-worker-sup)))
(defrecord state
(limit 0)
sup
refs
(queue (queue:new)))
;; gen_server
;; gen_server:init
(defun init
((`#(,limit ,mfa ,sup))
(! (self) `#(start_worker_supervisor ,sup ,mfa))
`#(ok ,(make-state limit limit refs (gb_sets:empty)))))
;; gen_server:handle_info/2
(defun handle_info
((`#(DOWN ,ref process ,_pid ,_) (= (match-state refs refs) s))
(io:format "received down msg ~n")
(case (gb_sets:is_element ref refs)
('true (handle-down-worker ref s))
('false `#(noreply ,s))))
((`#(start_worker_supervisor ,sup ,mfa) (= (match-state) s))
(let ((`#(ok ,pid) (supervisor:start_child sup (spec mfa))))
(link pid)
`#(noreply ,(set-state s sup pid))))
((msg state)
(io:format "Unknwon msg : ~p~n" `(,msg))
`#(noreply ,state)))
;; gen_server:handle_call/3
(defun handle_call
;; run
((`#(run ,args) _from (= (match-state limit n sup sup refs r) s)) (when (> n 0))
(let* ((`#(ok ,pid) (supervisor:start_child sup args))
(ref (erlang:monitor 'process pid)))
`#(reply #(ok ,pid) ,(set-state s limit (- n 1) refs (gb_sets:add ref r)))))
((`#(run ,args) _from (= (match-state limit n) s)) (when (=< n 0))
`#(reply noalloc ,s))
;; sync
((`#(sync ,args) _from (= (match-state limit n sup sup refs r) s)) (when (> n 0))
(let* ((`#(ok ,pid) (supervisor:start_child sup args))
(ref (erlang:monitor 'process pid)))
`#(reply #(ok ,pid) ,(set-state s limit (- n 1) refs (gb_sets:add ref r)))))
((`#(sync ,args) from (= (make-state queue q) s))
`#(noreply ,(set-state s queue (queue:in `#(,from ,args) q))))
;; stop
(('stop _frome state)
`#(stop normal ok ,state))
((_msg _from state)
`#(noreply ,state)))
;; gen_server:handle_cast/2
(defun handle_cast
((`#(async ,args) (= (match-state limit n sup sup refs r) s)) (when (> n 0))
(let* ((`#(ok ,pid) (supervisor:start_child sup args))
(ref (erlang:monitor 'process pid)))
`#(noreply ,(set-state s limit (- n 1) refs (gb_sets:add ref r)))))
((`#(async ,args) (= (match-state limit n queue q) s)) (when (=< n 0))
`#(noreply ,(set-state s queue (queue:in args q))))
((_msg state)
`#(noreply ,state)))
(defun code_change (_oldvsn state extra)
`#(ok ,state))
(defun terminate (_reason _state)
'ok)
;; handle-down-worker/2
(defun handle-down-worker
((ref (= (match-state limit l sup sup refs refs) s))
(case (queue:out (state-queue s))
(`#(#(value #(,from ,args)) ,q)
(let* ((`#(ok ,pid) (supervisor:start_child sup args))
(new-ref (erlang:monitor 'process pid))
(new-refs (gb_sets:insert new-ref (gb_sets:delete ref refs))))
(gen_server:reply from `#(ok ,pid))
`#(noreply ,(set-state s refs new-refs queue q))))
(`#(#(value ,args) ,q)
(let* ((`#(ok ,pid) (supervisor:start_child sup args))
(new-ref (erlang:monitor 'process pid))
(new-refs (gb_sets:insert new-ref (gb_sets:delete ref refs))))
`#(noreply ,(set-state s refs new-refs queue q))))
(`#(empty ,_q)
`#(noreply ,(set-state s limit (+ l 1) refs (gb_sets:delete ref refs)))))))
(defmodule ppool-sup
(export all)
(behavior supervisor))
(defun start-link (name limit mfa)
(supervisor:start_link (MODULE) `#(,name ,limit ,mfa)))
(defun start_link (name limit mfa)
(start-link name limit mfa))
(defun init
((`#(,name ,limit ,mfa))
(let ((max-restart 1)
(max-time 3600))
`#(ok #(#(one_for_all ,max-restart ,max-time)
(#(serv
#(ppool-serv start_link (,name ,limit ,(self) ,mfa))
permanent 5000 worker (ppool-serv))))))))
(defmodule ppool-supersup
(export all)
(behavior supervisor))
(defun start-link ()
(supervisor:start_link `#(local ppool) (MODULE) '()))
(defun start_link ()
(start-link))
(defun stop ()
(case (whereis 'ppool)
(p (when (is_pid p))
(exit p 'kill))
(_ 'ok)))
(defun init
(('())
#(ok #(#(one_for_all 6 3600) ()))))
(defun start-pool (name limit mfa)
(let ((child-spec `#(,name
#(ppool-sup start_link (,name ,limit ,mfa))
permanent 10500 supervisor (ppool-sup))))
(supervisor:start_child 'ppool child-spec)))
(defun start_pool (name limit mfa)
(start-pool name limit mfa))
(defun stop-pool (name)
(supervisor:terminate_child 'ppool name)
(supervisor:delete_child 'ppool name))
(defun stop_pool (name)
(stop-pool name))
(defmodule ppool-worker-sup
(export all)
(behavior supervisor))
(defun start-link
(((= `#(,_ ,_ ,_) mfa))
(supervisor:start_link (MODULE) mfa)))
(defun start_link (mfa)
(start-link mfa))
(defun init
((`#(,m ,f ,a))
`#(ok #(#(simple_one_for_one 5 3600)
(#(ppool-worker
#(,m ,f ,a)
temporary 5000 worker (,m)))))))
(defmodule ppool
(export all))
(defun start-link ()
(ppool-supersup:start-link))
(defun start_link ()
(start-link))
(defun stop ()
(ppool-supersup:stop))
(defun start-pool
((name limit `#(,m ,f ,a))
(ppool-supersup:start-pool name limit `#(,m ,f ,a))))
(defun start_pool (name limit mfa)
(start-pool name limit mfa))
(defun stop-pool (name)
(ppool-supersup:stop-pool name))
(defun stop_pool (name)
(stop-pool name))
(defun run (name args)
(ppool-serv:run name args))
(defun async-queue (name args)
(ppool-serv:async-queue name args))
(defun async_queue (name args)
(async-queue name args))
(defun sync-queue (name args)
(ppool-serv:sync-queue name args))
(defun sync_queue (name args)
(sync-queue name args))
(defun test ()
(ppool:start-link)
(ppool:start-pool 'nagger 2 #(ppool-nagger start_link ()))
(ppool:run 'nagger `("finish the chapter!" 5000 2 ,(self)))
(ppool:run 'nagger `("Watch a good movie" 5000 2 ,(self)))
(ppool:run 'nagger `("clean up bit a" 5000 2 ,(self)))
(timer:sleep 10000)
(c:flush)
(ppool:async-queue 'nagger `("Pay the bills" 2000 1 ,(self)))
(ppool:async-queue 'nagger `("Take a shower" 2000 1 ,(self)))
(timer:sleep 10000)
(c:flush)
(ppool:sync-queue 'nagger `("Pet a dog" 2000 1 ,(self)))
(ppool:sync-queue 'nagger `("Make some noise" 2000 1 ,(self)))
(timer:sleep 10000)
(c:flush)
(ppool:stop-pool 'nagger)
(ppool:stop)
(c:flush)
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment