Skip to content

Instantly share code, notes, and snippets.

@zelinskiy
Forked from BekaValentine/iohk.hs
Created November 12, 2017 07:19
Show Gist options
  • Save zelinskiy/647a907c296bde5bfae294f5bf56bd91 to your computer and use it in GitHub Desktop.
Save zelinskiy/647a907c296bde5bfae294f5bf56bd91 to your computer and use it in GitHub Desktop.
{-# LANGUAGE DeriveGeneric #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE StandaloneDeriving #-}
{-# LANGUAGE TemplateHaskell #-}
{-# LANGUAGE ViewPatterns #-}
import Control.Concurrent (threadDelay)
import Control.Distributed.Process
import Control.Distributed.Process.Closure
import Control.Distributed.Process.Node
import Control.Distributed.Process.Backend.SimpleLocalnet
import Control.Monad (forM,forM_)
import Data.Binary
import Data.List (sort)
import GHC.Generics
import System.Environment (getArgs)
import System.Console.GetOpt
import System.Random
-- This program consists of two main kinds of processing nodes, Queen and
-- Drones. The Queen serves to coordinate setup of processes and distribution
-- of information among Drones, and tells the Drones to begin. The Drones
-- themselves do all the actual transmission with process-to-process messages.
-- | A `DroneMessage` is how the Drones communicate numbers to one another.
data DroneMessage
= Number Float
deriving (Show,Generic)
instance Binary DroneMessage
-- | A `TimeoutMessage` is a message from a timeout process to a receiver
-- process telling it to stop receiving because the grace period is over.
data TimeoutMessage
= Finished
deriving (Show,Generic)
instance Binary TimeoutMessage
-- | A drone node runs by waitin for its peer info and its RNG seed, and then
-- spawning two sub-processes: a transmitter process that will send messages
-- every `messageInterval` until the `transmitTime` runs out, and a
-- `graceTimeout` process, which will wait for `transmitTime + graceTime` then
-- send a `Finished` message to the main drone process, after which time the
-- drone will complete its task.
--
-- The transmitter is initialized with some random floats, from which it draws
-- its numbers for transmission.
drone :: (Int,Int) -> Process ()
drone (transmitTime, graceTime) = do
liftIO $ putStrLn "Drone initiated. Waiting for setup info from queen."
(prs, rs) <- expect :: Process ([ProcessId], Int)
liftIO $ putStrLn "Spawning transmitter and timout."
selfPid <- getSelfPid
_ <- spawnLocal
(transmitter
prs
(randomRs (0,1) (mkStdGen rs) :: [Float])
transmitTime)
_ <- spawnLocal (graceTimeout selfPid)
liftIO $ putStrLn "Receiving transmissions."
receiver []
where
-- Please customize this as desired.
messageInterval = 100000
-- | The transmitter has some peer `ProcessId`s, the RNG seed, and the
-- remaining time as argument, and so long as there's time remaining,
-- sends the next random number. It then repeats, with less time, after
-- the `messageInterval`.
transmitter prs rs t | t < 0 =
liftIO $ putStrLn "Finished transmitting."
transmitter prs rs t = do
liftIO $ putStrLn $ "Transmitting " ++ show (head rs)
forM_ prs $ \pid -> send pid (Number (head rs))
liftIO $ threadDelay messageInterval
transmitter prs (tail rs) (t - messageInterval)
-- | The timeout simply waits and then sends a `Finished` message.
graceTimeout pid = do
liftIO $ threadDelay (transmitTime + graceTime)
send pid Finished
-- | The receiver keeps a running list of the numbers it's received. Each
-- cycle, it waits for one of two kinds of messages, either a `Number i`
-- message telling it a number, after which it adds that number to the
-- list and cycles again, or a `Finished` message, after which it performs
-- the final display of information.
receiver nums = do
receiveWait
[ match (\(Number i) -> do
liftIO $ putStrLn $ "Received " ++ show i
receiver (i:nums))
, match (\Finished -> liftIO $ do
liftIO $ putStrLn "Finished grace period."
let rev = reverse nums
l = length rev
summed = sum (zipWith (+) [1..fromIntegral l] rev)
liftIO $ putStrLn $ "Final tuple: " ++ show (l, summed))
]
remotable ['drone]
-- | A queen process splits an initial random seed into subseeds for each
-- drone process. It then spawns a drone on each node and distributes
-- information to them, thus starting them up. After a period of
-- `sendfor + waitfor + 1000000`, the queen kills off the drones and exits.
queen :: Int -> Int -> Int -> Backend -> [NodeId] -> Process ()
queen sendfor waitfor seed backend drones = do
liftIO $ putStrLn "Queen initiated."
let randomGen = mkStdGen seed
nodeCount = length drones
subseeds = take nodeCount (randoms randomGen :: [Int])
liftIO $ putStrLn "Spawning drones."
pids <- forM drones $ \nid ->
spawn nid $ $(mkClosure 'drone) (sendfor, waitfor)
liftIO $ putStrLn "Sending setup info to drones and beginning."
forM_ (zip pids subseeds) $ \(pid,subseed) -> send pid (pids, subseed)
liftIO $ threadDelay (sendfor + waitfor + 1000000)
liftIO $ putStrLn "Finished. Sending end signal."
terminateAllSlaves backend
-- | There are three kinds of flags for command line arguments:
-- `SendFor` corresponds to the --send-for arg, and has a time argument
-- `WaitFor` corresponds to the --wait-for arg, and has a time argument
-- `WithSeed` corresponds to the -with-seed arg, and has a number argument
data Flags = SendFor Int | WaitFor Int | WithSeed Int
deriving (Show,Eq,Ord)
myRemoteTable :: RemoteTable
myRemoteTable = Main.__remoteTable initRemoteTable
-- | The main IO action uses a standard GetOpts options parser to handle
-- command line options. By default, there are always three un-flagged
-- arguments: a node/process type (either `queen` or `node`), the IP address
-- of the node, and the port for the node to operate over. The additional
-- flagged arguments are required for drones only.
main :: IO ()
main = do
args <- getArgs
case args of
("queen" : host : port : rest) -> do
case getOpt Permute options rest of
(sort -> [SendFor l, WaitFor k, WithSeed s], [], []) -> do
backend <- initializeBackend host port myRemoteTable
liftIO $ threadDelay 1000000
startMaster backend (queen l k s backend)
(opts, [], []) ->
error $ "incorrect arguments: " ++ show opts
(_, nonOpts, []) ->
error $ "unrecognized arguments: " ++ unwords nonOpts
(_, _, msgs) ->
error $ concat msgs ++ usageInfo header options
["drone", host, port] -> do
backend <- initializeBackend host port myRemoteTable
startSlave backend
where
options :: [OptDescr Flags]
options =
[ Option []
["send-for"]
(ReqArg (SendFor . read) "TIME")
"transmission period"
, Option []
["wait-for"]
(ReqArg (WaitFor . read) "TIME")
"grace period"
, Option []
["with-seed"]
(ReqArg (WithSeed . read) "NUMBER")
"RNG seed"
]
header = "Usage: <name> queen/drone IP PORT [OPTION...]"
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment