Skip to content

Instantly share code, notes, and snippets.

@edwinb
Last active August 29, 2015 14:23
Show Gist options
  • Save edwinb/45f1b2cda00f20a47775 to your computer and use it in GitHub Desktop.
Save edwinb/45f1b2cda00f20a47775 to your computer and use it in GitHub Desktop.
More concurrency in Idris
module Main
import System.Concurrency.Raw
%access public
-- Process IDs are parameterised by the type of request they are willing to
-- take (req, which is implicit), and a function which calculates the type of
-- response they'll send
abstract
data ProcID : (resp : req -> Type) -> Type where
MkPID : Ptr -> ProcID {req} resp
-- Processes we've sent a request to, but not got a response yet.
data ReqPending : Type where
None : ReqPending
Pending : (ProcID {req} resp, Type) -> ReqPending
-- Processes are parameterised by their request and response types, like
-- ProcID, and also their "responsiveness" - i.e. whether they run forever
-- and contain a 'Respond' so we can tell from the type if it's going to
-- respond to no requests (awake = False),
-- one request (awake = True, running = False)
-- or infinite requests (awake = True, running = True)
-- We also keep track of requests we've made that we haven't got the response
-- to yet - this allows us to send a message and do other stuff while the
-- server is computing a response.
data Process : (resp : req -> Type) ->
(running : Bool) -> -- running forever
(awake : Bool) -> -- contains a Respond
ReqPending -> ReqPending ->
Type -> Type where
-- Some plumbing
Lift' : IO a -> Process resp False False p p a
Pure : a -> Process resp False False p p a
(>>=) : Process resp r awake p p' a ->
(a -> Process resp r' awake' p' p'' b) ->
Process resp r' (awake || awake') p p'' b
-- Calculate the response to a request from some client, and send it.
-- The function returns a pair, the second element of which can be used
-- to update internal state, for example.
-- Blocks if there are no requests waiting.
Respond : ((x : req) -> Process {req} resp False False p p (resp x, t)) ->
Process resp False True p p t
-- As Respond, but with a default if no message waiting
TryRespond : t ->
((x : req) -> Process {req} resp False False p p (resp x, t)) ->
Process {req} resp False True p p t
-- As Respond, but with a default if timeout passes
TimeoutRespond : Int -> t ->
((x : req) -> Process {req} resp False False p p (resp x, t)) ->
Process {req} resp False True p p t
-- Send a request to a process. Only allowed if there are no other
-- requests (since we can't guarantee order of response).
Request : (proc : ProcID {req = req'} resp') -> (x : req') ->
Process resp False False None (Pending (proc, resp' x)) ()
-- Get a response from a process, as long as we're waiting for one.
GetReply : (proc : ProcID {req = req'} resp') ->
Process resp False False (Pending (proc, ty)) None ty
-- Continue processing requests recursively. This is the only way to
-- have 'running' be True, so if we want to start a process which
-- processes requests forever, we have to use Loop.
Loop : Inf (Process resp True True p p t) ->
Process resp True False p p t
-- Start a new server process and return its ID. This ID can be shared
-- between multiple programs and threads.
StartServer : Process {req = req'} resp' True True p p () ->
Process resp False False p p (ProcID {req = req'} resp')
-- Start a new process, with no restrictions on it. We can't communicate
-- with this process since there are no guarantees about how long
-- it will run.
Fork : Process resp' r a None None () ->
Process resp False False p p ()
-- Fork a child process which can send us messages on our interface,
-- which we are guaranteed to respond to because we're running
-- forever.
StartClient : (ProcID {req} resp -> Process resp' r a None None ()) ->
Process {req} resp True True p p () ->
Process {req} resp True True p p ()
Server : (resp : req -> Type) -> Type -> Type
Server resp = Process resp True True None None
Program : Type -> Type
Program t = Process {req = Void} (const Void) False False None None t
implicit
Lift : IO a -> Process resp False False p p a
Lift = Lift'
-- There's no need to be in 'Process' to send a message to a server, once
-- that server process is running, because we know it's continuing to be
-- ready to respond!
send : ProcID {req} resp -> (x : req) -> IO (resp x)
send (MkPID pid) x = do sendToThread pid x
return !(getMsgFrom pid)
Send : ProcID {req = req'} resp' -> (x : req') ->
Process resp False False None None (resp' x)
Send proc x = do Request proc x
GetReply proc
-- Run the process DSL
mutual
respond : (req : Type) ->
((x : req) -> Process {req} resp False False p p (resp x, t)) ->
IO t
respond req f = do (pid, msg) <- recv
(resp, val) <- eval (f msg)
sendToThread pid resp
return val
where recv : IO (Ptr, req)
recv = getMsgWithSender
eval : Process {req} resp s s' p p' t -> IO t
eval (Lift' x) = x
eval (Pure x) = pure x
eval (x >>= f) = do x' <- eval x
eval (f x')
eval {req} (Respond f) = respond req f
eval {req} (TryRespond t f) = if !checkMsgs
then (respond req f)
else return t
eval {req} (TimeoutRespond delay t f) = if !(checkMsgsTimeout delay)
then (respond req f)
else return t
eval (Request (MkPID pid) x) = sendToThread pid x
eval (GetReply {ty} (MkPID pid)) = getMsgFrom pid
eval (Loop x) = eval x
eval (StartServer proc) = do ptr <- fork (eval proc)
return (MkPID ptr)
eval (Fork proc) = do ptr <- fork (eval proc)
return ()
eval (StartClient f proc) = do ptr <- fork (eval (f (MkPID prim__vm)))
eval proc
run : Process resp running awake None None t -> IO t
run = eval
{------------ EXAMPLE ------------}
-- A simple server with two commands; add two numbers, or return the number
-- of operations already performed.
data Command = Add Int Int
| GetOpCount
-- Response type of each command
Resp : Command -> Type
Resp (Add x y) = Int
Resp GetOpCount = Int
-- The server itself keeps track of an internal state, which is the number
-- of commands performed, and responds to requests forever.
addServer : Int -> Server Resp ()
addServer x = do putStrLn $ "Awaiting input"
x' <- TimeoutRespond 10 x
(\val => case val of
Add l r => Pure (l + r, x + 1)
GetOpCount => Pure (x, x))
putStrLn $ "Response done, uptime " ++ show x'
Loop (addServer x')
-- FIXME: This ugly thing is to make the 'case' below know its type
simple : Program () -> Program ()
simple x = x
-- A program which uses the server
countProg : ProcID Resp -> Program ()
countProg proc = do putStr ": "
x <- getLine
simple $ case getNums (trim x) of
Nothing => do ops <- Send proc GetOpCount
putStrLn $ "Uptime: " ++ show ops
Just (x, y) => do sum <- Send proc (Add x y)
putStrLn $ "Sum is: " ++ show sum
countProg proc
where
getNums : String -> Maybe (Int, Int)
getNums xs with (words xs)
getNums xs | [l, r] = Just (cast l, cast r)
getNums xs | _ = Nothing
-- Finally, the main program starts the server
main : IO ()
main = run (do server <- StartServer (addServer 0)
countProg server)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment