Skip to content

Instantly share code, notes, and snippets.

@sarthakbagaria
Created July 22, 2018 19:36
Show Gist options
  • Save sarthakbagaria/dad0b6cb6817442259dcfb8f4387f030 to your computer and use it in GitHub Desktop.
Save sarthakbagaria/dad0b6cb6817442259dcfb8f4387f030 to your computer and use it in GitHub Desktop.
A script to demonstrate distributed computing using Cloud Haskell. Each node sends a randomly generated number to every node (including itself), and at the end each node aggregates all the messages it received. Install Haskell 'stack' before running this script. First spin up some slave (messaging) nodes using 'distributed-computing.hs --host-ra…
#!/usr/bin/env stack
{- stack
script
--resolver lts-11.17
--package distributed-process
--package network-transport-tcp
--package distributed-process-simplelocalnet
--package pqueue
--package options
--package random
--package time
--package binary
-}
{-# LANGUAGE DeriveDataTypeable #-}
{-# LANGUAGE DeriveGeneric #-}
{-# LANGUAGE TemplateHaskell #-}
{-# LANGUAGE OverloadedStrings #-}
module Main where
import System.Environment (getArgs)
import Control.Distributed.Process
import Control.Distributed.Process.Node (initRemoteTable, runProcess)
import Control.Distributed.Process.Backend.SimpleLocalnet
import Control.Distributed.Process.Closure (remotable, mkClosure, mkStatic, SerializableDict(..))
import Control.Monad (forever, forM_, forM)
import qualified System.Random as Rand
import Data.Binary (Binary)
import Data.Typeable (Typeable)
import GHC.Generics (Generic)
import qualified Data.PQueue.Prio.Min as Q
import Data.Time.Clock.POSIX (getPOSIXTime, POSIXTime)
import Control.Applicative
import Control.Concurrent (threadDelay)
import Options (Options, simpleOption, defineOptions, runCommand)
import Data.List (foldl')
import Debug.Trace (traceIO)
-- Finished imports --
type NumMessage = Double
type Time = Rational
data MessageOrEnd = MessageWithPayload NumMessage Time
| End
deriving (Generic, Typeable)
instance Binary MessageOrEnd
data CmdOptions = CmdOptions
{ sendFor :: Int
, waitFor :: Int
, withSeed :: Int
, hostRank :: String
, host :: String
, port :: String
} deriving Show
instance Options CmdOptions where
defineOptions = pure CmdOptions
<*> simpleOption "send-for" 5 "Number of sends for which the nodes send messages."
<*> simpleOption "wait-for" 5 "Grace period in seconds after the messages are sent."
<*> simpleOption "with-seed" 12345 "Seed to use for random number generation."
<*> simpleOption "host-rank" "slave" "Whether the node is a master or a slave."
<*> simpleOption "host" "127.0.0.1" "Network host."
<*> simpleOption "port" "8080" "Network port."
-- A process which continuously sends numbers generated
-- randomly using provided seed to every port provided
sendProcess :: (Int, [ SendPort MessageOrEnd ]) -> Process ()
sendProcess (rSeed, sendPorts) = do
liftIO $ traceIO $ "Sending messages."
let randGen = Rand.mkStdGen rSeed
sendLoop randGen
where
sendLoop rGen = do
let (randInt, newRGen) = Rand.next rGen
(rStart, rEnd) = Rand.genRange rGen
-- normalize the random number to lie in (0,1]
rand = (fromIntegral $! randInt - rStart + 1) / (fromIntegral $! rEnd - rStart + 1) :: Double
-- send the timestamp in the message payload to be able to order messages
-- based on sending time
-- assuming all nodes have synchronized their clocks
time <- liftIO $ getPOSIXTime :: Process POSIXTime
forM_ sendPorts $ \sendPort -> sendChan sendPort $ MessageWithPayload rand $ toRational time
sendLoop newRGen
-- A process which continuously recieves a timesptamped message
-- until it received an End signal when starts aggregating
-- the messages received
recProcess :: () -> ReceivePort MessageOrEnd -> Process ()
recProcess _ rPort = do
liftIO $ traceIO $ "Receiving messages."
let eQueue = Q.empty
recLoop eQueue
where
recLoop queue = do
maybeRes <- receiveChan rPort
case maybeRes of
MessageWithPayload rand time -> do
-- Since we are storing a continuous stream of messages in a queue
-- the memory requirement may blow up quickly.
-- May want to keep the queue size fixed if running program for long time
-- by popping out oldest messages and adding to the aggregate result
-- (this may lead to wrong ordering in high latency networks though)
let newQueue = Q.insert time rand queue
recLoop newQueue
-- we may lose some messages in transit if End is received before them
End -> do
liftIO $ traceIO $ "Signal to end received."
let timeOrdMessages = Q.elems queue
messageAgg = snd $ foldl' (\(i,sum) x -> (i+1, sum + (i+1)*x)) (0,0) timeOrdMessages
liftIO $ traceIO $ "Result is " ++ (show (length timeOrdMessages, messageAgg))
say $ show (length timeOrdMessages, messageAgg)
sdictMessageOrEnd :: SerializableDict MessageOrEnd
sdictMessageOrEnd = SerializableDict
remotable ['sendProcess, 'recProcess, 'sdictMessageOrEnd]
myRemoteTable :: RemoteTable
myRemoteTable = Main.__remoteTable initRemoteTable
-- If a receiving node is down for some time
-- it will lose messages sent by other nodes
-- in that period.
-- This model does not guarantee that the results from
-- all the nodes will be same in case of
-- network or system failures
master :: Backend -> CmdOptions -> [NodeId] -> Process ()
master backend opts slaves = do
nodeId <- getSelfNode
liftIO $ traceIO $ show "Master running on " ++ (show nodeId)
-- send different seed to each slave
let nodeSeeds = map ((+) (withSeed opts)) [1..(length slaves)]
sendPorts <- forM slaves $ \slave -> spawnChannel $(mkStatic 'sdictMessageOrEnd) slave $ $(mkClosure 'recProcess) ()
liftIO $ traceIO $ "Spawned channels: " ++ (show sendPorts)
sendingPids <- forM (zip slaves nodeSeeds) $ \(slave, nSeed) -> spawn slave $ $(mkClosure 'sendProcess) (nSeed, sendPorts)
liftIO $ traceIO $ "Spawned sending processes: " ++ (show sendingPids)
-- after sending time is over, kill the processes sending messages
liftIO $ threadDelay $ (sendFor opts) * 10^6
liftIO $ traceIO $ "Sending time over."
forM_ sendingPids $ \pid -> exit pid ("Sending time over." :: String)
liftIO $ traceIO $ "Exit signal sent to senders."
-- wait for some grace period for recievers to receive
-- message in transit
liftIO $ threadDelay $ (waitFor opts) * 10^6
-- and ask receivers to stop receiveing and start calculating
forM_ sendPorts $ \sendPort -> sendChan sendPort End
liftIO $ traceIO $ "Asked receivers to end."
-- Based on host rank, initalize network backend
-- and start communication
main = do
runCommand $ \opts _ -> case hostRank opts of
"master" -> do
traceIO $ show opts
backend <- initializeBackend (host opts) (port opts) myRemoteTable
traceIO $ "Backend initialized."
startMaster backend (master backend opts)
"slave" -> do
traceIO $ show opts
backend <- initializeBackend (host opts) (port opts) myRemoteTable
traceIO $ "Backend initialized."
startSlave backend
_ -> putStrLn $ "Unkown host rank " ++ (hostRank opts)
@sarthakbagaria
Copy link
Author

sarthakbagaria commented Jul 22, 2018

A script to demonstrate distributed computing using Cloud Haskell.

Each node sends a randomly generated number to every node (including itself), and at the end each node aggregates all the messages it received and shows the result.

Install Haskell stack before running this script.

First spin up a couple of slave (messaging) nodes using
distributed-computing.hs --host-rank slave --host 127.0.0.1 --port 8081
distributed-computing.hs --host-rank slave --host 127.0.0.1 --port 8082
and then spin up a master (controller) node using
distributed-computing.hs --host-rank master --host 127.0.0.1 --port 8080 --with-seed 123456 --send-for 5 --wait-for 5
Ensure that each node is on a unique host/port pair.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment