Skip to content

Instantly share code, notes, and snippets.

@BekaValentine
Last active November 12, 2017 07:19
Show Gist options
  • Save BekaValentine/d619cf4ffea92f45c3c936ba8c628281 to your computer and use it in GitHub Desktop.
Save BekaValentine/d619cf4ffea92f45c3c936ba8c628281 to your computer and use it in GitHub Desktop.
{-# LANGUAGE DeriveGeneric #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE StandaloneDeriving #-}
{-# LANGUAGE TemplateHaskell #-}
{-# LANGUAGE ViewPatterns #-}
import Control.Concurrent (threadDelay)
import Control.Distributed.Process
import Control.Distributed.Process.Closure
import Control.Distributed.Process.Node
import Control.Distributed.Process.Backend.SimpleLocalnet
import Control.Monad (forM,forM_)
import Data.Binary
import Data.List (sort)
import GHC.Generics
import System.Environment (getArgs)
import System.Console.GetOpt
import System.Random
-- This program consists of two main kinds of processing nodes, Queen and
-- Drones. The Queen serves to coordinate setup of processes and distribution
-- of information among Drones, and tells the Drones to begin. The Drones
-- themselves do all the actual transmission with process-to-process messages.
-- | A `DroneMessage` is how the Drones communicate numbers to one another.
data DroneMessage
= Number Float
deriving (Show,Generic)
instance Binary DroneMessage
-- | A `TimeoutMessage` is a message from a timeout process to a receiver
-- process telling it to stop receiving because the grace period is over.
data TimeoutMessage
= Finished
deriving (Show,Generic)
instance Binary TimeoutMessage
-- | A drone node runs by waitin for its peer info and its RNG seed, and then
-- spawning two sub-processes: a transmitter process that will send messages
-- every `messageInterval` until the `transmitTime` runs out, and a
-- `graceTimeout` process, which will wait for `transmitTime + graceTime` then
-- send a `Finished` message to the main drone process, after which time the
-- drone will complete its task.
--
-- The transmitter is initialized with some random floats, from which it draws
-- its numbers for transmission.
drone :: (Int,Int) -> Process ()
drone (transmitTime, graceTime) = do
liftIO $ putStrLn "Drone initiated. Waiting for setup info from queen."
(prs, rs) <- expect :: Process ([ProcessId], Int)
liftIO $ putStrLn "Spawning transmitter and timout."
selfPid <- getSelfPid
_ <- spawnLocal
(transmitter
prs
(randomRs (0,1) (mkStdGen rs) :: [Float])
transmitTime)
_ <- spawnLocal (graceTimeout selfPid)
liftIO $ putStrLn "Receiving transmissions."
receiver []
where
-- Please customize this as desired.
messageInterval = 100000
-- | The transmitter has some peer `ProcessId`s, the RNG seed, and the
-- remaining time as argument, and so long as there's time remaining,
-- sends the next random number. It then repeats, with less time, after
-- the `messageInterval`.
transmitter prs rs t | t < 0 =
liftIO $ putStrLn "Finished transmitting."
transmitter prs rs t = do
liftIO $ putStrLn $ "Transmitting " ++ show (head rs)
forM_ prs $ \pid -> send pid (Number (head rs))
liftIO $ threadDelay messageInterval
transmitter prs (tail rs) (t - messageInterval)
-- | The timeout simply waits and then sends a `Finished` message.
graceTimeout pid = do
liftIO $ threadDelay (transmitTime + graceTime)
send pid Finished
-- | The receiver keeps a running list of the numbers it's received. Each
-- cycle, it waits for one of two kinds of messages, either a `Number i`
-- message telling it a number, after which it adds that number to the
-- list and cycles again, or a `Finished` message, after which it performs
-- the final display of information.
receiver nums = do
receiveWait
[ match (\(Number i) -> do
liftIO $ putStrLn $ "Received " ++ show i
receiver (i:nums))
, match (\Finished -> liftIO $ do
liftIO $ putStrLn "Finished grace period."
let rev = reverse nums
l = length rev
summed = sum (zipWith (+) [1..fromIntegral l] rev)
liftIO $ putStrLn $ "Final tuple: " ++ show (l, summed))
]
remotable ['drone]
-- | A queen process splits an initial random seed into subseeds for each
-- drone process. It then spawns a drone on each node and distributes
-- information to them, thus starting them up. After a period of
-- `sendfor + waitfor + 1000000`, the queen kills off the drones and exits.
queen :: Int -> Int -> Int -> Backend -> [NodeId] -> Process ()
queen sendfor waitfor seed backend drones = do
liftIO $ putStrLn "Queen initiated."
let randomGen = mkStdGen seed
nodeCount = length drones
subseeds = take nodeCount (randoms randomGen :: [Int])
liftIO $ putStrLn "Spawning drones."
pids <- forM drones $ \nid ->
spawn nid $ $(mkClosure 'drone) (sendfor, waitfor)
liftIO $ putStrLn "Sending setup info to drones and beginning."
forM_ (zip pids subseeds) $ \(pid,subseed) -> send pid (pids, subseed)
liftIO $ threadDelay (sendfor + waitfor + 1000000)
liftIO $ putStrLn "Finished. Sending end signal."
terminateAllSlaves backend
-- | There are three kinds of flags for command line arguments:
-- `SendFor` corresponds to the --send-for arg, and has a time argument
-- `WaitFor` corresponds to the --wait-for arg, and has a time argument
-- `WithSeed` corresponds to the -with-seed arg, and has a number argument
data Flags = SendFor Int | WaitFor Int | WithSeed Int
deriving (Show,Eq,Ord)
myRemoteTable :: RemoteTable
myRemoteTable = Main.__remoteTable initRemoteTable
-- | The main IO action uses a standard GetOpts options parser to handle
-- command line options. By default, there are always three un-flagged
-- arguments: a node/process type (either `queen` or `node`), the IP address
-- of the node, and the port for the node to operate over. The additional
-- flagged arguments are required for drones only.
main :: IO ()
main = do
args <- getArgs
case args of
("queen" : host : port : rest) -> do
case getOpt Permute options rest of
(sort -> [SendFor l, WaitFor k, WithSeed s], [], []) -> do
backend <- initializeBackend host port myRemoteTable
liftIO $ threadDelay 1000000
startMaster backend (queen l k s backend)
(opts, [], []) ->
error $ "incorrect arguments: " ++ show opts
(_, nonOpts, []) ->
error $ "unrecognized arguments: " ++ unwords nonOpts
(_, _, msgs) ->
error $ concat msgs ++ usageInfo header options
["drone", host, port] -> do
backend <- initializeBackend host port myRemoteTable
startSlave backend
where
options :: [OptDescr Flags]
options =
[ Option []
["send-for"]
(ReqArg (SendFor . read) "TIME")
"transmission period"
, Option []
["wait-for"]
(ReqArg (WaitFor . read) "TIME")
"grace period"
, Option []
["with-seed"]
(ReqArg (WithSeed . read) "NUMBER")
"RNG seed"
]
header = "Usage: <name> queen/drone IP PORT [OPTION...]"
@BekaValentine
Copy link
Author

BekaValentine commented Sep 24, 2016

To compile, the cabal build-depends are:

base, binary, distributed-process, distributed-process-simplelocalnet, mtl, random

To run, start up drones on whichever machines are desirable, eg

machine0 $ <name> drone 127.0.0.1 8080
machine1 $ <name> drone 127.0.0.1 8080

Then start a queen on any machine, eg

machine0 $ <name> queen 127.0.0.1 8081 --send-for 1000000 --wait-for 1000000 --with-seed 0

The IP/Port information is for the system to know where it is. I used a master-slave configuration for setupt so that the processes will automatically communicate with one another correctly, without a fixed list of nodes.

If two nodes are run on the same machine, different IP-Port combinations must be used. Make sure firewalls aren't running; on my Macbook, I have to manually authorize the binary once after compilation, your setup will likely vary.

Each node, including the queen, will log information to the command line output. Drones will finish by logging the final tuple. Here is an example run:

$ iohk drone localhost 8080
Drone initiated. Waiting for setup info from queen.
Spawning transmitter and timout.
Transmitting 0.9306598
Receiving transmissions.
Received 0.9306598
Received 0.6170139
Received 0.19317281
Transmitting 0.6123277
Received 0.6123277
Finished grace period.
Final tuple: (4,12.353174)
Finished transmitting.



$ iohk drone localhost 8081
Drone initiated. Waiting for setup info from queen.
Spawning transmitter and timout.
Receiving transmissions.
Transmitting 0.6170139
Received 0.6170139
Received 0.9306598
Transmitting 0.19317281
Received 0.19317281
Received 0.6123277
Finished grace period.
Final tuple: (4,12.353174)
Finished transmitting.



$ iohk queen localhost 8082 --send-for 100000 --wait-for 100000 --with-seed 7
Queen initiated.
Spawning drones.
Sending setup info to drones and beginning.
Finished. Sending end signal.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment