Skip to content

Instantly share code, notes, and snippets.

@edwinb
Created July 12, 2015 14:39
Show Gist options
  • Save edwinb/8374789251a45dc3d781 to your computer and use it in GitHub Desktop.
Save edwinb/8374789251a45dc3d781 to your computer and use it in GitHub Desktop.
Mucking about with processes again
module Process
import System.Concurrency.Raw
import Data.List
import System
-- Process IDs are parameterised by their interface. A request of type
-- 'iface t' has a response of type 't'
abstract
data ProcID : (iface : Type -> Type) -> Type where
MkPID : Ptr -> ProcID iface
data ServerID : Type where
MkServer : ProcID iface -> ServerID
data ReqPending : Type where
None : ReqPending
One : (ProcID req, Type) -> ReqPending
-- Current state of a process includes:
-- * whether a response to a request is pending
-- * the servers it currently has an open connection to
-- * the number of clients it currently has connected
-- Therefore, we can write process types which make clear that a process
-- cannot quit while it is talking to a server or while it still has clients
-- expecting to communicate with it
data ProcState : Type where
MkProcState : ReqPending ->
(servers : List ServerID) ->
(clients : Nat) ->
ProcState
{-- Some useful predicates on process state --}
data Pending : ProcState -> ProcID req -> Type -> Type where
IsPending : Pending (MkProcState (One (p, t)) s c) p t
data NoPending : ProcState -> Type where
IsNoPending : NoPending (MkProcState None s c)
data Connected : ServerID -> ProcState -> Type where
IsConnected : Elem p servers -> Connected p (MkProcState r servers c)
data NoClient : ProcState -> Type where
IsNoClient : NoClient (MkProcState r servers 0)
data OneClient : ProcState -> Type where
IsOneClient : OneClient (MkProcState r servers (S k))
{-- Some useful operations on process state --}
newClient : ProcState -> ProcState
newClient (MkProcState x servers clients)
= MkProcState x servers (S clients)
setClients : ProcState -> Nat -> ProcState
setClients (MkProcState x servers clients) k
= MkProcState x servers k
newServer : ProcID iface -> ProcState -> ProcState
newServer p (MkProcState x servers clients)
= MkProcState x (MkServer p :: servers) clients
dropServer : (r : ProcID iface) -> (p : ProcState) ->
Connected (MkServer r) p -> ProcState
dropServer r (MkProcState x servers c) (IsConnected prf)
= MkProcState x (dropElem servers prf) c
pendingReq : ProcID iface -> iface t -> (p : ProcState) -> NoPending p -> ProcState
pendingReq {t} p x (MkProcState None s c) IsNoPending
= MkProcState (One (p, t)) s c
doneReq : (r : ProcID iface) -> (p : ProcState) -> Pending p r ty -> ProcState
doneReq r (MkProcState (One (r, t)) s c) IsPending
= MkProcState None s c
runningServer : Nat -> ProcState
runningServer c = MkProcState None [] (S c)
doneServer : ProcState
doneServer = MkProcState None [] 0
init : List ServerID -> ProcState
init s = MkProcState None s 0
{-- Processes themselves.
A process returns some time 'a', responds to requests on the interface
'iface', and has an input and output state.
--}
data Process : (a : Type) -> (iface : Type -> Type) ->
ProcState -> (a -> ProcState) ->
Type where
-- Some plumbing
Lift' : IO a -> Process a iface p (const p)
Pure : a -> Process a iface p (const p)
bind : Process a iface p p' ->
((x : a) -> Process b iface (p' x) p'') ->
Process b iface p p''
Fork : Process () serveri (runningServer 1) (const doneServer) ->
Process (ProcID serveri) iface p (\res => (newServer res p))
Request : (r : ProcID serveri) -> (x : serveri t) ->
{auto connected : Connected (MkServer r) p} ->
{auto nopend : NoPending p} ->
Process () iface p (const (pendingReq r x p nopend))
GetReply : (r : ProcID serveri) ->
{auto pend : Pending p r ty} ->
Process ty iface p (const (doneReq r p pend))
TimeoutRespond : (timeout : Int) ->
(def : res) ->
({t : Type} -> (x : iface t) ->
Process (t, res) iface p (const p)) ->
Process res iface p (const p)
Respond : ({t : Type} -> (x : iface t) ->
Process (t, res) iface p (const p)) ->
Process res iface p (const p)
Connect : (r : ProcID serveri) ->
Process Bool iface p (\ok => case ok of
True => newServer r p
False => p)
Disconnect : (r : ProcID serveri) ->
{auto connected : Connected (MkServer r) p} ->
Process () iface p (const (dropServer r p connected))
CountClients : Process Nat iface p (\n => setClients p n)
Loop : Inf (Process () iface p p') -> Process () iface p p'
-- 'Running a iface' is the type of a process which is currently
-- responding to requests (i.e. knows it has at least one client connected)
-- and will not exit unless there are no clients connected
Running : Type -> (iface : Type -> Type) -> Type
Running a iface = {k : Nat} -> Process a iface (runningServer k) (const doneServer)
-- 'Program a' is the type of a process which does not respond to any requests
-- and beings and ends with no connections to any server open.
Program : Type -> Type
Program a = {s : List ServerID} -> Process a (const Void) (init s) (const (init s))
implicit
Lift : IO a -> Process a iface p (const p)
Lift = Lift'
%no_implicit -- helps error messages, and speeds things up a bit
(>>=) : Process a iface p p' ->
((x : a) -> Process b iface (p' x) p'') ->
Process b iface p p''
(>>=) = bind
{--- evaluator --}
-- The evaluator keeps track of the number of client connections open,
-- and manages Connect/Disconnect requests by managing them whenever a
-- 'Response' or 'TimeoutResponse' is encountered.
data Message : (Type -> Type) -> Type where
ConnectMsg : Message iface
CloseMsg : Message iface
RequestMsg : iface t -> Message iface
readMsg : IO (Maybe (Ptr, Message iface))
readMsg {iface} =
do if !checkMsgs
then do (pid, msg) <- getMsgWithSender {a = Message iface}
return (Just (pid, msg))
else return Nothing
readMsgTimeout : Int -> IO (Maybe (Ptr, Message iface))
readMsgTimeout {iface} i =
do if !(checkMsgsTimeout i)
then do (pid, msg) <- getMsgWithSender {a = Message iface}
return (Just (pid, msg))
else return Nothing
eval : (clients : Nat) -> Process t iface p p' -> IO (t, Nat)
eval clients (Lift' x) = do x' <- x
return (x', clients)
eval clients (Pure x) = return (x, clients)
eval clients (bind x f) = do (x', clients') <- eval clients x
eval clients' (f x')
eval clients (Fork proc) = do ptr <- fork (do _ <- eval 1 proc
return ())
return (MkPID ptr, clients)
eval clients (Request (MkPID pid) x) = do sendToThread pid (RequestMsg x)
return ((), clients)
eval clients (GetReply (MkPID pid)) = do res <- getMsgFrom pid
return (res, clients)
eval {iface} clients (Respond f) = do
msg <- readMsg {iface}
case msg of
Nothing => eval clients (Respond f) -- readMsg blocks...
Just (sender, ConnectMsg) =>
eval (clients + 1) (Respond f)
Just (sender, CloseMsg) =>
eval (clients - 1) (Respond f)
Just (sender, RequestMsg {t} m) =>
do ((resp, val), clients') <- eval clients (f m)
sendToThread sender resp
return (val, clients')
eval {iface} clients (TimeoutRespond i def f) = do
msg <- readMsgTimeout {iface} i
case msg of
Nothing => return (def, clients)
Just (sender, ConnectMsg) =>
eval (clients + 1) (TimeoutRespond i def f)
Just (sender, CloseMsg) =>
eval (clients - 1) (TimeoutRespond i def f)
Just (sender, RequestMsg {t} m) =>
do ((resp, val), clients') <- eval clients (f m)
sendToThread sender resp
return (val, clients')
eval clients (Connect {serveri} (MkPID pid)) = do
x <- sendToThread pid (ConnectMsg {iface = serveri})
return (x == 1, clients)
eval clients (Disconnect {serveri} (MkPID pid)) = do
x <- sendToThread pid (CloseMsg {iface = serveri})
return ((), clients)
eval clients CountClients = return (clients, clients)
eval clients (Loop x) = eval clients x
run : Process () (const Void) (init []) (const (init [])) -> IO ()
run p = do eval 0 p
return ()
{--- test ---}
-- The usual test, a simple program which responds to requests to add
-- numbers, and keeps track of the number of operations performed.
data Cmd : Type -> Type where
Add : Double -> Double -> Cmd Double
GetOpCount : Cmd Int
addServer : Int -> Running () Cmd
addServer uptime
= do uptime' <- TimeoutRespond 2 uptime
(\val => case val of
Add x y => Pure (x + y, uptime + 1)
GetOpCount => Pure (uptime, uptime))
c <- CountClients
putStrLn (show c ++ " clients connected")
-- Not allowed to quit if there's some clients connected, because
-- the type is 'Running () Cmd'!
case c of
Z => putStrLn "No more clients, quitting"
S k => Loop (addServer uptime')
-- Add client repeatedly connects to a server until either the server
-- has died, or it gets a negative number in response.
-- Note that it needs to open a connection and close it at the end, due
-- to the type 'Program ()'.
addClient : ProcID Cmd -> Program ()
addClient pid = do True <- Connect pid
| False => do putStrLn "Server died"
Pure ()
putStr ": "
x <- getLine
continue <- case getNums (trim x) of
Nothing => do Request pid GetOpCount
putStrLn $ "Uptime: " ++ show !(GetReply pid)
Pure True
Just (x, y) => do Request pid (Add x y)
putStrLn $ "Sum is: " ++ show !(GetReply pid)
Pure (x + y > 0)
Disconnect pid
if continue then addClient pid
else Pure ()
where
getNums : String -> Maybe (Double, Double)
getNums xs with (words xs)
getNums xs | [l, r] = Just (cast l, cast r)
getNums xs | _ = Nothing
-- Main program initialises a process, runs the client, then when done
-- disconnects the server.
sumProg : Program ()
sumProg = do pid <- Fork (addServer 0)
addClient pid
Disconnect pid
usleep 5000000
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment