public
Last active

Distributed ping (with boilerplate code, i.e. without Template Haskell)

  • Download Gist
distributed-ping.hs
Haskell
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99
{-# LANGUAGE DeriveDataTypeable, GeneralizedNewtypeDeriving #-}
import System.Environment (getArgs, getProgName)
import Control.Monad (forM_, replicateM_)
import Data.Binary (Binary, encode, decode)
import Data.Typeable (Typeable)
import Data.ByteString.Lazy (ByteString)
import Control.Concurrent (threadDelay)
import Data.Rank1Dynamic (toDynamic)
import Control.Distributed.Static
( Static
, Closure(..)
, RemoteTable
, registerStatic
, staticLabel
, staticCompose
)
import Control.Distributed.Process
import Control.Distributed.Process.Node (initRemoteTable, runProcess)
import Control.Distributed.Process.Serializable (SerializableDict(..))
import Control.Distributed.Process.Backend.SimpleLocalnet
( Backend
, startMaster
, initializeBackend
, newLocalNode
, findPeers
, findSlaves
)
newtype Ping = Ping ProcessId
deriving (Typeable, Binary, Show)
newtype Pong = Pong ProcessId
deriving (Typeable, Binary, Show)
worker :: Ping -> Process ()
worker (Ping master) = do
wId <- getSelfPid
say "Got a Ping!"
send master (Pong wId)
-- // Explicitly construct Closures
workerStatic :: Static (Ping -> Process ())
workerStatic = staticLabel "$ping.worker"
decodePingStatic :: Static (ByteString -> Ping)
decodePingStatic = staticLabel "$ping.decodePing"
workerClosure :: Ping -> Closure (Process ())
workerClosure p = closure decoder (encode p)
where
decoder :: Static (ByteString -> Process ())
decoder = workerStatic `staticCompose` decodePingStatic
-- //
initialProcess :: String -> [NodeId] -> Process ()
initialProcess "WORKER" peers = do
say $ "Peers: " ++ show peers
pid <- getSelfPid
register "slaveController" pid
receiveWait []
initialProcess "MASTER" workers = do
say $ "Workers: " ++ show workers
pid <- getSelfPid
forM_ workers $ \w -> do
say $ "Sending a Ping to " ++ (show w) ++ "..."
spawn w (workerClosure (Ping pid))
say $ "Waiting for reply from " ++ (show (length workers)) ++ " worker(s)"
replicateM_ (length workers) $ do
let resultMatch = match (\(Pong wId) -> return wId)
in do wId <- receiveWait [resultMatch]
say $ "Got back a Pong from "
++ (show $ processNodeId wId) ++ "!"
(liftIO . threadDelay) 2000000 -- Wait a bit before return
 
main = do
prog <- getProgName
args <- getArgs
case args of
["master", host, port] -> do
backend <- initializeBackend host port rtable
node <- newLocalNode backend
runProcess node $ do
slaves <- findSlaves backend
(initialProcess "MASTER" (map processNodeId slaves))
["worker", host, port] -> do
backend <- initializeBackend host port rtable
node <- newLocalNode backend
peers <- findPeers backend 50000
runProcess node (initialProcess "WORKER" peers)
_ ->
putStrLn $ "usage: " ++ prog ++ " (master | worker) host port"
where
rtable :: RemoteTable
rtable = registerStatic "$ping.worker" (toDynamic worker)
. registerStatic "$ping.decodePing" (toDynamic
(decode :: ByteString -> Ping))
$ initRemoteTable

Hello,

When I tried compiling this code, I get the following error:

pp.hs:18:29:
No instance for (Binary ProcessId)
arising from the 'deriving' clause of a data type declaration
Possible fix:
add an instance declaration for (Binary ProcessId)
or use a standalone 'deriving instance' declaration,
so you can specify the instance context yourself
When deriving the instance for (Binary Ping)

pp.hs:21:29:
No instance for (Binary ProcessId)
arising from the 'deriving' clause of a data type declaration
Possible fix:
add an instance declaration for (Binary ProcessId)
or use a standalone 'deriving instance' declaration,
so you can specify the instance context yourself
When deriving the instance for (Binary Pong)

I'm still a haskell novice, so please pardon me if this is a novice class issue.

Thanks,

Carl

I was getting the same problem...

It turns out I had a old version of the Binary package, and then, even after upgrading it, the distributed-process was still linking with the old one. As indicated in section 8 of http://www.haskell.org/haskellwiki/Cloud_Haskell, you need at least version 0.6.3.0 of Binary.

So, in my particular case the solution was:

0) cabal update && cabal install binary

1) cabal list binary

This resulted in the following:

  • binary Synopsis: Binary serialisation for Haskell values using lazy ByteStrings Default available version: 0.7.1.0 Installed versions: 0.5.1.1, 0.6.4.0 Homepage: https://github.com/kolmodin/binary License: BSD3

Then, I tried to unregister the older version:

2) ghc-pkg unregister binary-0.5.1.1

ghc-pkg: unregistering binary-0.5.1.1 would break the following packages: ghc-7.6.3 bin-package-db-0.0.0.0 distributed-process-simplelocalnet-0.2.0.9 distributed-process-0.4.2 distributed-static-0.2.1.1 network-transport-tcp-0.3.1 rank1dynamic-0.1.0.2 network-transport-0.3.0.1 ghc-mod-2.0.3 ghc-syb-utils-0.2.1.1 yesod-1.2.2 yesod-auth-1.2.1 pureMD5-2.1.2.1 SHA-1.6.1 (use --force to override)

Oops.. so perhaps I don't want to do that. But this gave me a clue for the next command, the one that effectively fixed it:

3) cabal install distributed-process-simplelocalnet distributed-process distributed-network-transport-tcp rank1dynamic network-transport --reinstall

And voilĂ , it now compiles (and runs) without problem.

@cepete

Make sure you don't have another version of binary package registered.

ghc-pkg list

For instance, I had binary-0.7.1.0 and binary-0.6.4.0. I removed the binary-0.7.1.0 with ghc-pkg unregister binary-0.7.1.0 and the same compile time error you got above went away.

Please sign in to comment on this gist.

Something went wrong with that request. Please try again.