Created
July 22, 2018 19:36
-
-
Save sarthakbagaria/dad0b6cb6817442259dcfb8f4387f030 to your computer and use it in GitHub Desktop.
A script to demonstrate distributed computing using Cloud Haskell. Each node sends a randomly generated number to every node (including itself), and at the end each node aggregates all the messages it received. Install Haskell 'stack' before running this script. First spin up some slave (messaging) nodes using 'distributed-computing.hs --host-ra…
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env stack | |
{- stack | |
script | |
--resolver lts-11.17 | |
--package distributed-process | |
--package network-transport-tcp | |
--package distributed-process-simplelocalnet | |
--package pqueue | |
--package options | |
--package random | |
--package time | |
--package binary | |
-} | |
{-# LANGUAGE DeriveDataTypeable #-} | |
{-# LANGUAGE DeriveGeneric #-} | |
{-# LANGUAGE TemplateHaskell #-} | |
{-# LANGUAGE OverloadedStrings #-} | |
module Main where | |
import System.Environment (getArgs) | |
import Control.Distributed.Process | |
import Control.Distributed.Process.Node (initRemoteTable, runProcess) | |
import Control.Distributed.Process.Backend.SimpleLocalnet | |
import Control.Distributed.Process.Closure (remotable, mkClosure, mkStatic, SerializableDict(..)) | |
import Control.Monad (forever, forM_, forM) | |
import qualified System.Random as Rand | |
import Data.Binary (Binary) | |
import Data.Typeable (Typeable) | |
import GHC.Generics (Generic) | |
import qualified Data.PQueue.Prio.Min as Q | |
import Data.Time.Clock.POSIX (getPOSIXTime, POSIXTime) | |
import Control.Applicative | |
import Control.Concurrent (threadDelay) | |
import Options (Options, simpleOption, defineOptions, runCommand) | |
import Data.List (foldl') | |
import Debug.Trace (traceIO) | |
-- Finished imports -- | |
type NumMessage = Double | |
type Time = Rational | |
data MessageOrEnd = MessageWithPayload NumMessage Time | |
| End | |
deriving (Generic, Typeable) | |
instance Binary MessageOrEnd | |
data CmdOptions = CmdOptions | |
{ sendFor :: Int | |
, waitFor :: Int | |
, withSeed :: Int | |
, hostRank :: String | |
, host :: String | |
, port :: String | |
} deriving Show | |
instance Options CmdOptions where | |
defineOptions = pure CmdOptions | |
<*> simpleOption "send-for" 5 "Number of sends for which the nodes send messages." | |
<*> simpleOption "wait-for" 5 "Grace period in seconds after the messages are sent." | |
<*> simpleOption "with-seed" 12345 "Seed to use for random number generation." | |
<*> simpleOption "host-rank" "slave" "Whether the node is a master or a slave." | |
<*> simpleOption "host" "127.0.0.1" "Network host." | |
<*> simpleOption "port" "8080" "Network port." | |
-- A process which continuously sends numbers generated | |
-- randomly using provided seed to every port provided | |
sendProcess :: (Int, [ SendPort MessageOrEnd ]) -> Process () | |
sendProcess (rSeed, sendPorts) = do | |
liftIO $ traceIO $ "Sending messages." | |
let randGen = Rand.mkStdGen rSeed | |
sendLoop randGen | |
where | |
sendLoop rGen = do | |
let (randInt, newRGen) = Rand.next rGen | |
(rStart, rEnd) = Rand.genRange rGen | |
-- normalize the random number to lie in (0,1] | |
rand = (fromIntegral $! randInt - rStart + 1) / (fromIntegral $! rEnd - rStart + 1) :: Double | |
-- send the timestamp in the message payload to be able to order messages | |
-- based on sending time | |
-- assuming all nodes have synchronized their clocks | |
time <- liftIO $ getPOSIXTime :: Process POSIXTime | |
forM_ sendPorts $ \sendPort -> sendChan sendPort $ MessageWithPayload rand $ toRational time | |
sendLoop newRGen | |
-- A process which continuously recieves a timesptamped message | |
-- until it received an End signal when starts aggregating | |
-- the messages received | |
recProcess :: () -> ReceivePort MessageOrEnd -> Process () | |
recProcess _ rPort = do | |
liftIO $ traceIO $ "Receiving messages." | |
let eQueue = Q.empty | |
recLoop eQueue | |
where | |
recLoop queue = do | |
maybeRes <- receiveChan rPort | |
case maybeRes of | |
MessageWithPayload rand time -> do | |
-- Since we are storing a continuous stream of messages in a queue | |
-- the memory requirement may blow up quickly. | |
-- May want to keep the queue size fixed if running program for long time | |
-- by popping out oldest messages and adding to the aggregate result | |
-- (this may lead to wrong ordering in high latency networks though) | |
let newQueue = Q.insert time rand queue | |
recLoop newQueue | |
-- we may lose some messages in transit if End is received before them | |
End -> do | |
liftIO $ traceIO $ "Signal to end received." | |
let timeOrdMessages = Q.elems queue | |
messageAgg = snd $ foldl' (\(i,sum) x -> (i+1, sum + (i+1)*x)) (0,0) timeOrdMessages | |
liftIO $ traceIO $ "Result is " ++ (show (length timeOrdMessages, messageAgg)) | |
say $ show (length timeOrdMessages, messageAgg) | |
sdictMessageOrEnd :: SerializableDict MessageOrEnd | |
sdictMessageOrEnd = SerializableDict | |
remotable ['sendProcess, 'recProcess, 'sdictMessageOrEnd] | |
myRemoteTable :: RemoteTable | |
myRemoteTable = Main.__remoteTable initRemoteTable | |
-- If a receiving node is down for some time | |
-- it will lose messages sent by other nodes | |
-- in that period. | |
-- This model does not guarantee that the results from | |
-- all the nodes will be same in case of | |
-- network or system failures | |
master :: Backend -> CmdOptions -> [NodeId] -> Process () | |
master backend opts slaves = do | |
nodeId <- getSelfNode | |
liftIO $ traceIO $ show "Master running on " ++ (show nodeId) | |
-- send different seed to each slave | |
let nodeSeeds = map ((+) (withSeed opts)) [1..(length slaves)] | |
sendPorts <- forM slaves $ \slave -> spawnChannel $(mkStatic 'sdictMessageOrEnd) slave $ $(mkClosure 'recProcess) () | |
liftIO $ traceIO $ "Spawned channels: " ++ (show sendPorts) | |
sendingPids <- forM (zip slaves nodeSeeds) $ \(slave, nSeed) -> spawn slave $ $(mkClosure 'sendProcess) (nSeed, sendPorts) | |
liftIO $ traceIO $ "Spawned sending processes: " ++ (show sendingPids) | |
-- after sending time is over, kill the processes sending messages | |
liftIO $ threadDelay $ (sendFor opts) * 10^6 | |
liftIO $ traceIO $ "Sending time over." | |
forM_ sendingPids $ \pid -> exit pid ("Sending time over." :: String) | |
liftIO $ traceIO $ "Exit signal sent to senders." | |
-- wait for some grace period for recievers to receive | |
-- message in transit | |
liftIO $ threadDelay $ (waitFor opts) * 10^6 | |
-- and ask receivers to stop receiveing and start calculating | |
forM_ sendPorts $ \sendPort -> sendChan sendPort End | |
liftIO $ traceIO $ "Asked receivers to end." | |
-- Based on host rank, initalize network backend | |
-- and start communication | |
main = do | |
runCommand $ \opts _ -> case hostRank opts of | |
"master" -> do | |
traceIO $ show opts | |
backend <- initializeBackend (host opts) (port opts) myRemoteTable | |
traceIO $ "Backend initialized." | |
startMaster backend (master backend opts) | |
"slave" -> do | |
traceIO $ show opts | |
backend <- initializeBackend (host opts) (port opts) myRemoteTable | |
traceIO $ "Backend initialized." | |
startSlave backend | |
_ -> putStrLn $ "Unkown host rank " ++ (hostRank opts) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
A script to demonstrate distributed computing using Cloud Haskell.
Each node sends a randomly generated number to every node (including itself), and at the end each node aggregates all the messages it received and shows the result.
Install Haskell stack before running this script.
First spin up a couple of slave (messaging) nodes using
distributed-computing.hs --host-rank slave --host 127.0.0.1 --port 8081
distributed-computing.hs --host-rank slave --host 127.0.0.1 --port 8082
and then spin up a master (controller) node using
distributed-computing.hs --host-rank master --host 127.0.0.1 --port 8080 --with-seed 123456 --send-for 5 --wait-for 5
Ensure that each node is on a unique host/port pair.