Created
August 22, 2012 23:58
-
-
Save thiago-negri/3430724 to your computer and use it in GitHub Desktop.
Cloud Haskell example
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{-# LANGUAGE GeneralizedNewtypeDeriving #-} | |
{-# LANGUAGE DeriveDataTypeable #-} | |
{-# LANGUAGE TemplateHaskell #-} | |
module Main (main) where | |
import Prelude hiding (catch) | |
import System.Environment (getArgs, getProgName) | |
import Control.Monad (forM, replicateM_, forever, void, liftM) | |
import Data.Binary (Binary) | |
import Data.Maybe (isJust) | |
import Data.Typeable (Typeable) | |
import Control.Concurrent (threadDelay) | |
import Control.Distributed.Process | |
import Control.Distributed.Process.Node (initRemoteTable, runProcess) | |
import Control.Distributed.Process.Backend.SimpleLocalnet | |
( Backend | |
, redirectLogsHere | |
, startSlave | |
, initializeBackend | |
, findSlaves | |
, newLocalNode | |
) | |
import Control.Distributed.Process.Closure (remotable, mkClosure) | |
newtype Ping = Ping (SendPort Pong) | |
deriving (Typeable, Binary, Show) | |
newtype Pong = Pong ProcessId | |
deriving (Typeable, Binary, Show) | |
worker :: Ping -> Process () | |
worker (Ping sPong) = do | |
wId <- getSelfPid | |
say "Got a Ping!" | |
sendChan sPong (Pong wId) | |
$(remotable ['worker]) | |
master :: Backend -> Process () | |
master backend = forever $ do | |
workers <- findSlaves backend | |
say $ "Slaves: " ++ show workers | |
redirectLogsHere backend | |
(liftIO . threadDelay) 2000000 -- Wait a bit before return, testing slave disconnecting | |
(sPong, rPong) <- newChan | |
workers' <- liftM (filter isJust) . forM workers $ \w -> | |
flip catch (catcher w) $ do | |
void $ spawn w ($(mkClosure 'worker) (Ping sPong)) | |
say $ "Sent ping to " ++ (show w) ++ "." | |
return $ Just w | |
say $ "Waiting for reply from " ++ (show (length workers')) ++ " worker(s)" | |
replicateM_ (length workers') $ do | |
(Pong wId) <- receiveChan rPong -- FIXME timeout | |
say $ "Got back a Pong from " ++ (show $ processNodeId wId) ++ "!" | |
where catcher :: NodeId -> IOError -> Process (Maybe a) | |
catcher w e = do say $ "ERROR Could not send message to " ++ (show w) ++ ": " ++ (show e) | |
return Nothing | |
main :: IO () | |
main = do | |
prog <- getProgName | |
args <- getArgs | |
case args of | |
["master", host, port] -> do | |
backend <- initializeBackend host port remoteTable | |
node <- newLocalNode backend | |
runProcess node (master backend) | |
["worker", host, port] -> do | |
backend <- initializeBackend host port remoteTable | |
startSlave backend | |
_ -> | |
putStrLn $ "usage: " ++ prog ++ " (master | worker) host port" | |
-- Wiring | |
remoteTable :: RemoteTable | |
remoteTable = __remoteTable initRemoteTable |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment