-
-
Save zelinskiy/647a907c296bde5bfae294f5bf56bd91 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{-# LANGUAGE DeriveGeneric #-} | |
{-# LANGUAGE OverloadedStrings #-} | |
{-# LANGUAGE StandaloneDeriving #-} | |
{-# LANGUAGE TemplateHaskell #-} | |
{-# LANGUAGE ViewPatterns #-} | |
import Control.Concurrent (threadDelay) | |
import Control.Distributed.Process | |
import Control.Distributed.Process.Closure | |
import Control.Distributed.Process.Node | |
import Control.Distributed.Process.Backend.SimpleLocalnet | |
import Control.Monad (forM,forM_) | |
import Data.Binary | |
import Data.List (sort) | |
import GHC.Generics | |
import System.Environment (getArgs) | |
import System.Console.GetOpt | |
import System.Random | |
-- This program consists of two main kinds of processing nodes, Queen and | |
-- Drones. The Queen serves to coordinate setup of processes and distribution | |
-- of information among Drones, and tells the Drones to begin. The Drones | |
-- themselves do all the actual transmission with process-to-process messages. | |
-- | A `DroneMessage` is how the Drones communicate numbers to one another. | |
data DroneMessage | |
= Number Float | |
deriving (Show,Generic) | |
instance Binary DroneMessage | |
-- | A `TimeoutMessage` is a message from a timeout process to a receiver | |
-- process telling it to stop receiving because the grace period is over. | |
data TimeoutMessage | |
= Finished | |
deriving (Show,Generic) | |
instance Binary TimeoutMessage | |
-- | A drone node runs by waitin for its peer info and its RNG seed, and then | |
-- spawning two sub-processes: a transmitter process that will send messages | |
-- every `messageInterval` until the `transmitTime` runs out, and a | |
-- `graceTimeout` process, which will wait for `transmitTime + graceTime` then | |
-- send a `Finished` message to the main drone process, after which time the | |
-- drone will complete its task. | |
-- | |
-- The transmitter is initialized with some random floats, from which it draws | |
-- its numbers for transmission. | |
drone :: (Int,Int) -> Process () | |
drone (transmitTime, graceTime) = do | |
liftIO $ putStrLn "Drone initiated. Waiting for setup info from queen." | |
(prs, rs) <- expect :: Process ([ProcessId], Int) | |
liftIO $ putStrLn "Spawning transmitter and timout." | |
selfPid <- getSelfPid | |
_ <- spawnLocal | |
(transmitter | |
prs | |
(randomRs (0,1) (mkStdGen rs) :: [Float]) | |
transmitTime) | |
_ <- spawnLocal (graceTimeout selfPid) | |
liftIO $ putStrLn "Receiving transmissions." | |
receiver [] | |
where | |
-- Please customize this as desired. | |
messageInterval = 100000 | |
-- | The transmitter has some peer `ProcessId`s, the RNG seed, and the | |
-- remaining time as argument, and so long as there's time remaining, | |
-- sends the next random number. It then repeats, with less time, after | |
-- the `messageInterval`. | |
transmitter prs rs t | t < 0 = | |
liftIO $ putStrLn "Finished transmitting." | |
transmitter prs rs t = do | |
liftIO $ putStrLn $ "Transmitting " ++ show (head rs) | |
forM_ prs $ \pid -> send pid (Number (head rs)) | |
liftIO $ threadDelay messageInterval | |
transmitter prs (tail rs) (t - messageInterval) | |
-- | The timeout simply waits and then sends a `Finished` message. | |
graceTimeout pid = do | |
liftIO $ threadDelay (transmitTime + graceTime) | |
send pid Finished | |
-- | The receiver keeps a running list of the numbers it's received. Each | |
-- cycle, it waits for one of two kinds of messages, either a `Number i` | |
-- message telling it a number, after which it adds that number to the | |
-- list and cycles again, or a `Finished` message, after which it performs | |
-- the final display of information. | |
receiver nums = do | |
receiveWait | |
[ match (\(Number i) -> do | |
liftIO $ putStrLn $ "Received " ++ show i | |
receiver (i:nums)) | |
, match (\Finished -> liftIO $ do | |
liftIO $ putStrLn "Finished grace period." | |
let rev = reverse nums | |
l = length rev | |
summed = sum (zipWith (+) [1..fromIntegral l] rev) | |
liftIO $ putStrLn $ "Final tuple: " ++ show (l, summed)) | |
] | |
remotable ['drone] | |
-- | A queen process splits an initial random seed into subseeds for each | |
-- drone process. It then spawns a drone on each node and distributes | |
-- information to them, thus starting them up. After a period of | |
-- `sendfor + waitfor + 1000000`, the queen kills off the drones and exits. | |
queen :: Int -> Int -> Int -> Backend -> [NodeId] -> Process () | |
queen sendfor waitfor seed backend drones = do | |
liftIO $ putStrLn "Queen initiated." | |
let randomGen = mkStdGen seed | |
nodeCount = length drones | |
subseeds = take nodeCount (randoms randomGen :: [Int]) | |
liftIO $ putStrLn "Spawning drones." | |
pids <- forM drones $ \nid -> | |
spawn nid $ $(mkClosure 'drone) (sendfor, waitfor) | |
liftIO $ putStrLn "Sending setup info to drones and beginning." | |
forM_ (zip pids subseeds) $ \(pid,subseed) -> send pid (pids, subseed) | |
liftIO $ threadDelay (sendfor + waitfor + 1000000) | |
liftIO $ putStrLn "Finished. Sending end signal." | |
terminateAllSlaves backend | |
-- | There are three kinds of flags for command line arguments: | |
-- `SendFor` corresponds to the --send-for arg, and has a time argument | |
-- `WaitFor` corresponds to the --wait-for arg, and has a time argument | |
-- `WithSeed` corresponds to the -with-seed arg, and has a number argument | |
data Flags = SendFor Int | WaitFor Int | WithSeed Int | |
deriving (Show,Eq,Ord) | |
myRemoteTable :: RemoteTable | |
myRemoteTable = Main.__remoteTable initRemoteTable | |
-- | The main IO action uses a standard GetOpts options parser to handle | |
-- command line options. By default, there are always three un-flagged | |
-- arguments: a node/process type (either `queen` or `node`), the IP address | |
-- of the node, and the port for the node to operate over. The additional | |
-- flagged arguments are required for drones only. | |
main :: IO () | |
main = do | |
args <- getArgs | |
case args of | |
("queen" : host : port : rest) -> do | |
case getOpt Permute options rest of | |
(sort -> [SendFor l, WaitFor k, WithSeed s], [], []) -> do | |
backend <- initializeBackend host port myRemoteTable | |
liftIO $ threadDelay 1000000 | |
startMaster backend (queen l k s backend) | |
(opts, [], []) -> | |
error $ "incorrect arguments: " ++ show opts | |
(_, nonOpts, []) -> | |
error $ "unrecognized arguments: " ++ unwords nonOpts | |
(_, _, msgs) -> | |
error $ concat msgs ++ usageInfo header options | |
["drone", host, port] -> do | |
backend <- initializeBackend host port myRemoteTable | |
startSlave backend | |
where | |
options :: [OptDescr Flags] | |
options = | |
[ Option [] | |
["send-for"] | |
(ReqArg (SendFor . read) "TIME") | |
"transmission period" | |
, Option [] | |
["wait-for"] | |
(ReqArg (WaitFor . read) "TIME") | |
"grace period" | |
, Option [] | |
["with-seed"] | |
(ReqArg (WithSeed . read) "NUMBER") | |
"RNG seed" | |
] | |
header = "Usage: <name> queen/drone IP PORT [OPTION...]" |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment