Skip to content

Instantly share code, notes, and snippets.

@mcandre
Forked from billdozr/distributed-ping.hs
Created June 15, 2013 01:13
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mcandre/5786348 to your computer and use it in GitHub Desktop.
Save mcandre/5786348 to your computer and use it in GitHub Desktop.
{-# LANGUAGE DeriveDataTypeable, GeneralizedNewtypeDeriving #-}
import System.Environment (getArgs, getProgName)
import Control.Monad (forM_, replicateM_)
import Data.Binary (Binary, encode, decode)
import Data.Typeable (Typeable)
import Data.ByteString.Lazy (ByteString)
import Control.Concurrent (threadDelay)
import Data.Rank1Dynamic (toDynamic)
import Control.Distributed.Static
( Static
, Closure(..)
, RemoteTable
, registerStatic
, staticLabel
, staticCompose
)
import Control.Distributed.Process
import Control.Distributed.Process.Node (initRemoteTable, runProcess)
import Control.Distributed.Process.Serializable (SerializableDict(..))
import Control.Distributed.Process.Backend.SimpleLocalnet
( Backend
, startMaster
, initializeBackend
, newLocalNode
, findPeers
)
newtype Ping = Ping ProcessId
deriving (Typeable, Binary, Show)
newtype Pong = Pong ProcessId
deriving (Typeable, Binary, Show)
worker :: Ping -> Process ()
worker (Ping master) = do
wId <- getSelfPid
say "Got a Ping!"
send master (Pong wId)
-- // Explicitly construct Closures
workerStatic :: Static (Ping -> Process ())
workerStatic = staticLabel "$ping.worker"
decodePingStatic :: Static (ByteString -> Ping)
decodePingStatic = staticLabel "$ping.decodePing"
workerClosure :: Ping -> Closure (Process ())
workerClosure p = closure decoder (encode p)
where
decoder :: Static (ByteString -> Process ())
decoder = workerStatic `staticCompose` decodePingStatic
-- //
initialProcess :: String -> [NodeId] -> Process ()
initialProcess "WORKER" peers = do
say $ "Peers: " ++ show peers
pid <- getSelfPid
register "slaveController" pid
receiveWait []
initialProcess "MASTER" workers = do
say $ "Workers: " ++ show workers
pid <- getSelfPid
forM_ workers $ \w -> do
say $ "Sending a Ping to " ++ (show w) ++ "..."
spawn w (workerClosure (Ping pid))
say $ "Waiting for reply from " ++ (show (length workers)) ++ " worker(s)"
replicateM_ (length workers) $ do
let resultMatch = match (\(Pong wId) -> return wId)
in do wId <- receiveWait [resultMatch]
say $ "Got back a Pong from "
++ (show $ processNodeId wId) ++ "!"
(liftIO . threadDelay) 2000000 -- Wait a bit before return
main = do
prog <- getProgName
args <- getArgs
case args of
["master", host, port] -> do
backend <- initializeBackend host port rtable
startMaster backend (initialProcess "MASTER")
["worker", host, port] -> do
backend <- initializeBackend host port rtable
node <- newLocalNode backend
peers <- findPeers backend 50000
runProcess node (initialProcess "WORKER" peers)
_ ->
putStrLn $ "usage: " ++ prog ++ " (master | worker) host port"
where
rtable :: RemoteTable
rtable = registerStatic "$ping.worker" (toDynamic worker)
. registerStatic "$ping.decodePing" (toDynamic
(decode :: ByteString -> Ping))
$ initRemoteTable
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment