Skip to content

Instantly share code, notes, and snippets.

@nh2
Last active August 29, 2015 14:23
Show Gist options
  • Save nh2/f36ea9208c6070adc129 to your computer and use it in GitHub Desktop.
Save nh2/f36ea9208c6070adc129 to your computer and use it in GitHub Desktop.
CloudHaskell queued messages problem with call (see comment for problem description)
{-# LANGUAGE TemplateHaskell, StandaloneDeriving, MultiParamTypeClasses, TypeFamilies, GeneralizedNewtypeDeriving #-}
import Control.Monad
import Control.Concurrent
import Control.Distributed.Process
import Control.Distributed.Process.Backend.SimpleLocalnet
import Control.Distributed.Process.Closure
import Control.Distributed.Process.Internal.Types (Process(..), LocalProcess)
import Control.Distributed.Process.Node (initRemoteTable)
import Control.Monad.Base (MonadBase(..))
import Control.Monad.Trans.Control (MonadBaseControl(..), control)
import Control.Monad.Trans.Reader (ReaderT)
import qualified Data.ByteString.Lazy as BSL
import System.Environment (getArgs)
deriving instance MonadBase IO Process
-- For monad-control < 1.0.0
instance MonadBaseControl IO Process where
newtype StM Process a = StProcess {_unSTProcess :: StM (ReaderT LocalProcess IO) a}
restoreM (StProcess m) = Process $ restoreM m
liftBaseWith f = Process $ liftBaseWith $ \ rib -> f (fmap StProcess . rib . unProcess)
-- Important to use lazy ByteStrings since `binary` has a bug that makes
-- decoding strict ones quadratic (fixed in latest `binary` master).
sleep :: (BSL.ByteString, Int) -> Process ()
sleep (bs, n) = liftIO $ do
putStrLn $ "sleep " ++ show n ++ " called with BS of length " ++ show (BSL.length bs)
threadDelay (n * 1000000)
putStrLn $ "sleep " ++ show n ++ " finished"
remotable ['sleep]
master :: Backend -> [NodeId] -> Process ()
master backend slaves = do
liftIO . putStrLn $ "Slaves: " ++ show slaves
let s1 = head slaves
-- The problem:
-- These `call`s should be executed all in parallel on the slave node, but they are not:
-- * The master prints 5 `calling call` messages as expected
-- * Only a subset of calls execute at the slave (sometimes 3, sometimes 4)
-- * The other ones get "queued" up somehow
-- * As soon as the first one finishes, the "queued" ones start
-- * Problem is only visible if a substantial amount of data is sent (see comment for the dummy100M.dump file)
forM_ [1..5] $ \i -> control $ \runInProcessMonad -> do
putStrLn "calling call"
_ <- forkIO $ do
-- Created with `dd if=/dev/zero of=dummy100M.dump bs=1M count=100`
bs <- BSL.readFile "dummy100M.dump"
runInProcessMonad $ call $(functionTDict 'sleep) s1 ($(mkClosure 'sleep) (bs, (i + 2) :: Int))
return ()
runInProcessMonad $ return ()
liftIO $ threadDelay 10000000
terminateAllSlaves backend
main :: IO ()
main = do
args <- getArgs
case args of
["master", host, port] -> do
backend <- initializeBackend host port (__remoteTable initRemoteTable)
startMaster backend (master backend)
["slave", host, port] -> do
backend <- initializeBackend host port (__remoteTable initRemoteTable)
startSlave backend
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment